Java 線程池框架核心代碼分析

收藏待读

Java 線程池框架核心代碼分析

前言

多線程編程中,為每個任務分配一個線程是不現實的,線程創建的開銷和資源消耗都是很高的。線程池應運而生,成為我們管理線程的利器。Java 通過 Executor 接口,提供了一種標準的方法將任務的提交過程和執行過程解耦開來,並用 Runnable 表示任務。

下面,我們來分析一下 Java 線程池框架的實現 ThreadPoolExecutor

下面的分析基於JDK1.7

生命周期

ThreadPoolExecutor 中,使用 CAPACITY 的高3位來表示運行狀態,分別是:

terminated()
terminated()

Java 線程池框架核心代碼分析

ThreadPoolExecutor 中用原子類來表示狀態位

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

線程池模型

核心參數

  • corePoolSize :最小存活的工作線程數量(如果設置 allowCoreThreadTimeOut ,那麼該值為 0)
  • maximumPoolSize :最大的線程數量,受限於 CAPACITY
  • keepAliveTime :對應線程的存活時間,時間單位由TimeUnit指定
  • workQueue :工作隊列,存儲待執行的任務
  • RejectExecutionHandler :拒絕策略,線程池滿後會觸發

線程池的最大容量: CAPACITY 中的前三位用作標誌位,也就是說工作線程的最大容量為 (2^29)-1

四種模型

CachedThreadPool
FixedThreadPool
SingleThreadPool
ScheduledThreadPool

執行任務 execute

核心邏輯:

  1. 當前線程數量 < corePoolSize ,直接開啟新的核心線程執行任務 addWorker(command, true)
  2. 當前線程數量 >= corePoolSize ,且任務加入工作隊列成功

    RUNNING
    
  3. 開啟普通線程執行任務 addWorker(command, false) ,開啟失敗就拒絕該任務

從上面的分析可以總結出線程池運行的四個階段:

  1. poolSize < corePoolSize 且隊列為空,此時會新建線程來處理提交的任務
  2. poolSize == corePoolSize ,此時提交的任務進入工作隊列,工作線程從隊列中獲取任務執行,此時隊列不為空且未滿。
  3. poolSize == corePoolSize ,並且隊列已滿,此時也會新建線程來處理提交的任務,但是 poolSize < maxPoolSize
  4. poolSize == maxPoolSize ,並且隊列已滿,此時會觸發拒絕策略

拒絕策略

前面我們提到任務無法執行會被拒絕, RejectedExecutionHandler 是處理被拒絕任務的接口。下面是四種拒絕策略。

AbortPolicy
CallerRunsPolicy
DiscardPolicy
DiscardOldersPolicy

線程池中的 Worker

Worker 繼承了 AbstractQueuedSynchronizerRunnable ,前者給 Worker 提供鎖的功能,後者執行工作線程的主要方法 runWorker(Worker w) (從任務隊列撈任務執行)。Worker 引用存在 workers 集合裏面,用 mainLock 守護。

private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet workers = new HashSet();

核心函數 runWorker

下面是簡化的邏輯,注意:每個工作線程的 run 都執行下面的函數

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    while (task != null || (task = getTask()) != null) {
        w.lock();
        beforeExecute(wt, task);
        task.run();
        afterExecute(task, thrown);
        w.unlock();
    }
    processWorkerExit(w, completedAbruptly);
}
  1. getTask() 中獲取任務
  2. 鎖住 worker
  3. 執行 beforeExecute(wt, task) ,這是 ThreadPoolExecutor 提供給子類的擴展方法
  4. 運行任務,如果該worker有配置了首次任務,則先執行首次任務且只執行一次。
  5. 執行 afterExecute(task, thrown);
  6. 解鎖 worker
  7. 如果獲取到的任務為 null,關閉 worker

獲取任務 getTask

線程池內部的任務隊列是一個阻塞隊列,具體實現在構造時傳入。

private final BlockingQueue workQueue;

getTask() 從任務隊列中獲取任務,支持阻塞和超時等待任務,四種情況會導致返回 null ,讓 worker 關閉。

STOP
SHUTDOWN

核心邏輯:根據 timed 在阻塞隊列上超時等待或者阻塞等待任務,等待任務超時會導致工作線程被關閉。

timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();

在以下兩種情況下等待任務會超時:

allowCoreThreadTimeOut(true)
wc > corePoolSize

工作隊列使用的是 BlockingQueue ,這裡就不展開了,後面再寫一篇詳細的分析。

總結

  • ThreadPoolExecutor 基於生產者-消費者模式,提交任務的操作相當於生產者,執行任務的線程相當於消費者。
  • Executors 提供了四種基於 ThreadPoolExecutor 構造線程池模型的方法,除此之外,我們還可以直接繼承 ThreadPoolExecutor ,重寫 beforeExecuteafterExecute 方法來定製線程池任務執行過程。
  • 使用有界隊列還是無界隊列需要根據具體情況考慮,工作隊列的大小和線程的數量也是需要好好考慮的。
  • 拒絕策略推薦使用 CallerRunsPolicy ,該策略不會拋棄任務,也不會拋出異常,而是將任務回退到調用者線程中執行。

原文 : ShareHub

相關閱讀

免责声明:本文内容来源于ShareHub,已注明原文出处和链接,文章观点不代表立场,如若侵犯到您的权益,或涉不实谣言,敬请向我们提出检举。