※基本的執行緒池
// ExecutorService es = Executors.newFixedThreadPool(3); // ExecutorService es = Executors.newCachedThreadPool(); ExecutorService es = Executors.newSingleThreadExecutor(); for (int i = 1; i <= 7; i++) { es.execute(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } for (int j = 1; j <= 5; j++) { System.out.println(Thread.currentThread().getName() + "->" + j); } }); } es.shutdown();
※newFixedThreadPool:固定大小的執行緒
newCachedThreadPool:無限大小的執行緒
newSingleThreadExecutor:單一的執行緒
※shutdown():不增加新的執行緒,當前執行緒不會關閉
shutdownNow():不增加新的執行緒,當前執行緒也會關閉
※排程
ScheduledExecutorService ses = Executors.newScheduledThreadPool(3); ses.schedule(() -> { System.out.println("yeah"); }, 2L, TimeUnit.SECONDS); ses.shutdown(); ScheduledExecutorService sess = Executors.newScheduledThreadPool(3); sess.scheduleAtFixedRate(() -> { System.out.println("xxx"); }, 2L, 1L, TimeUnit.SECONDS); try { Thread.sleep(5000L); sess.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } ScheduledExecutorService seas = Executors.newSingleThreadScheduledExecutor(); seas.scheduleAtFixedRate(() -> { System.out.println("xxx"); }, 2L, 1L, TimeUnit.SECONDS); try { Thread.sleep(5000L); seas.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); }
※newScheduledThreadPool、newSingleThreadScheduledExecutor 回傳 ScheduledExecutorService,裡面有執行一次的 schedule() 和重覆執行的 scheduleAtFixedRate()
※重覆執行要注意太早 shutdown() 就不執行了
※1.8 的 newWorkStealingPool
Executors.newWorkStealingPool();
※底層第一行就是 new ForkJoinPool,ForkJoinPool 是 1.7 增加的類別,但這個執行緒池是 1.8 才新增的,可參考這裡
※ThreadPoolExecutor
Executors 底層使用的就是這個,共有7個參數
new ThreadPoolExecutor(5,
20,
10,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
表示一開始就有 5 個執行緒可以用,最多20個
假設有30個 request 進來,5個先執行,然後25個會放在第5個參數的 queue 裡
再來15個執行緒去 queue 抓來執行,最後10個看第7個參數的放棄策略
執行完的15個執行緒,過10毫秒 (第3和4的參數) 就關閉
第六個參數是將 Runnable 包裝成 Thread
以下是第 5~7 可用的參數
BlockingQueue
ArrayBlockingQueue
DelayedWorkQueue 基於 heap 的資料結構,按照時間順序將每個任務進行排序
BlockingDeque
LinkedBlockingDeque 預設是 Integer.MAX_VALUE,所以有 OOM 的風險
SynchronousQueue 一生產就消費,如沒有消費者就阻塞,所以沒有容量大小
DelayQueue 只能消費已過期的
TransferQueue
LinkedTransferQueue 很像 SynchronousQueue,但有容量,消費者沒取到就會阻塞
LinkedBlockingQueue 預設是 Integer.MAX_VALUE,所以有 OOM 的風險
PriorityBlockingQueue 按照自定義的優先級消費
ThreadFactory
DaemonThreadFactory 創建背景執行的執行緒,之後出來的執行緒都是背景執行緒
DefaultThreadFactory 預設的執行緒工廠
PrivilegedThreadFactory 創建與當前執行緒具有相同權限的新執行緒
RejectedExecutionHandler
DiscardOldestPolicy 丟棄最老的任務,就是在 queue 裡面的
AbortPolicy 新來的任務直接丟棄並報異常
CallerRunsPolicy 同步調用,start() 調用後執行 run(),但這個策略是直接調用 run()
DiscardPolicy 新來的任務直接丟棄但不報異常
沒有留言:
張貼留言