Kotlin Coroutines(協程) 完全解析(五),協程的並發

收藏待读

Kotlin Coroutines(協程) 完全解析(五),協程的並發

Kotlin Coroutines(協程) 完全解析系列:

Kotlin Coroutines(協程) 完全解析(一),協程簡介

Kotlin Coroutines(協程) 完全解析(二),深入理解協程的掛起、恢復與調度

Kotlin Coroutines(協程) 完全解析(三),封裝異步回調、協程間關係及協程的取消

Kotlin Coroutines(協程) 完全解析(四),協程的異常處理

Kotlin Coroutines(協程) 完全解析(五),協程的並發

本文基於 Kotlin v1.3.0-rc-146,Kotlin-Coroutines v1.0.0-RC1

通過前面幾篇文章可以明白協程就是可以掛起和恢復執行的運算邏輯,掛起函數用狀態機的方式用掛起點將協程的運算邏輯拆分為不同的片段,每次運行協程執行的不同的邏輯片段。所以協程在運行時只是線程中的一塊代碼,線程的並發處理方式都可以用在協程上。不過協程還提供兩種特有的方式,一是不阻塞線程的互斥鎖 Mutex ,一是通過 ThreadLocal 實現的協程局部數據。

1. Mutex

線程中鎖都是阻塞式,在沒有獲取鎖時無法執行其他邏輯,而協程可以通過掛起函數解決這個,沒有獲取鎖就掛起協程,獲取後再恢復協程,協程掛起時線程並沒有阻塞可以執行其他邏輯。這種互斥鎖就是 Mutex ,它與 synchronized 關鍵字有些類似,還提供了 withLock 擴展函數,替代常用的 mutex.lock; try {...} finally { mutex.unlock() } :

fun main(args:Array) = runBlocking {
    val mutex = Mutex()
    var counter = 0
    repeat(10000) {
        GlobalScope.launch {
            mutex.withLock {
                counter ++
            }
        }
    }
    println("The final count is$counter")
}

Mutex 的使用比較簡單,不過需要注意的是多個協程競爭的應該是同一個 Mutex 互斥鎖。

2. 協程局部數據

線程中可以使用ThreadLocal作為線程局部數據,每個線程中的數據都是獨立的。協程中可以通過 ThreadLocal.asContextElement() 擴展函數實現協程局部數據,每次協程切換會恢復之前的值。先看下面的示例:

fun main(args:Array) = runBlocking {
    val threadLocal = ThreadLocal().apply { set("Init") }
    printlnValue(threadLocal)
    val job = GlobalScope.launch(threadLocal.asContextElement("launch")) {
        printlnValue(threadLocal)
        threadLocal.set("launch changed")
        printlnValue(threadLocal)
        yield()
        printlnValue(threadLocal)
    }
    job.join()
    printlnValue(threadLocal)
}

private fun printlnValue(threadLocal:ThreadLocal) {
    println("${Thread.currentThread()}thread local value:${threadLocal.get()}")
}

輸出如下:

Thread[main,5,main] thread local value: Init
Thread[DefaultDispatcher-worker-1,5,main] thread local value: launch
Thread[DefaultDispatcher-worker-1,5,main] thread local value: launch changed
Thread[DefaultDispatcher-worker-2,5,main] thread local value: launch
Thread[main,5,main] thread local value: Init

上面的輸出有個疑問的地方,為什麼執行 yield() 掛起函數後 threadLocal 的值不是 launch changed 而變回了 launch ?

下面直接分析源碼:

// 注意這裡 value 的默認值是 ThreadLocal 當前值
public fun ThreadLocal.asContextElement(value:T=get()): ThreadContextElement =
    ThreadLocalElement(value, this)

internal class ThreadLocalElement(
    private val value: T,
    private val threadLocal: ThreadLocal
) : ThreadContextElement {
    override val key: CoroutineContext.Key = ThreadLocalKey(threadLocal)

    override fun updateThreadContext(context:CoroutineContext): T {
        val oldState = threadLocal.get()
// 設置 threadLocal 的值為 value 前先保存了之前的值
        threadLocal.set(value)
        return oldState
    }

    override fun restoreThreadContext(context:CoroutineContext, oldState:T) {
// 將 threadLocal 修改為之前保存的值
        threadLocal.set(oldState)
    }
    ...
}

// 協程啟動和恢復都會用此函數包裝,在 Dispatched.run()、DisptchedContinuation.resumeWith() 、
// DisptchedContinuation.resumeUndispatched() 等協程啟動和恢復的地方都可以發現此函數的蹤影
internal actual inline fun withCoroutineContext(context:CoroutineContext, countOrElement:Any?, block: () -> T): T {
// updateThreadContext() 函數會調用到 ThreadContextElement.updateThreadContext(context)
// oldValue 是 threadLocal 之前的值
    val oldValue = updateThreadContext(context, countOrElement)
    try {
        return block()
    } finally {
// restoreThreadContext() 函數會調用到 ThreadContextElement.restoreThreadContext(context, oldValue)
        restoreThreadContext(context, oldValue)
    }
}

根據上面的源碼和斷點調試,可以發現協程的啟動和恢復都會執行一次 ThreadContextElement.updateThreadContext(context)ThreadContextElement.restoreThreadContext(context, oldValue) ,現在再分析一次上面的代碼運行:

fun main(args:Array) = runBlocking {
    val threadLocal = ThreadLocal().apply { set("Init") }
// 此時在 main 線程,threadLocal 的值為 Init
    printlnValue(threadLocal)
    val job = GlobalScope.launch(threadLocal.asContextElement("launch")) {
// 啟動協程後,切換到 DefaultDispatcher-worker-1 線程,threadLocal 在該線程的值為 null
        // 調用 updateThreadContext() 設置 threadLocal 的值為 launch,保存之前的為 null
        printlnValue(threadLocal)
        // 在 DefaultDispatcher-worker-1 線程,修改 threadLocal 的值為 launch changed
        threadLocal.set("launch changed")
        printlnValue(threadLocal)
        // yield() 掛起函數會掛起當前協程,並將協程分發到 Dispatcher.Default 的隊列中等待恢復
        // 掛起協程後調用 restoreThreadContext() 修改 threadLocal 為 null
        yield()
// 恢復協程後,此時在 DefaultDispatcher-worker-2 線程,threadLocal 的值為 null
        // 再次調用 updateThreadContext() 設置 threadLocal 的值為 launch,保存之前的為 null
        printlnValue(threadLocal)
        // 結束協程後,restoreThreadContext() 修改 threadLocal 為 null
    }
    job.join()
    // 此時已經從 DefaultDispatcher-worker-2 線程切換回 main 線程,main 線程中的 threadlocal 沒有修改過,還是為 Init
    printlnValue(threadLocal)
}

private fun printlnValue(threadLocal:ThreadLocal) {
    println("${Thread.currentThread()}thread local value:${threadLocal.get()}")
}

所以 ThreadContextElement 並不能跟蹤所有 ThreadLocal 對象的訪問,而且每次掛起時更新的值將丟失。最重要的牢記它的原理: 啟動和恢復時保存 ThreadLocal 在當前線程的值,並修改為 value,掛起和結束時修改當前線程 ThreadLocal 的值為之前保存的值。

3. 已有線程同步方式

  • 最簡單的 synchronized 關鍵字

  • ReentrantLock 等 java.util.concurrent.locks 包中鎖

  • AtomicInteger 等 java.util.concurrent.atomic 包中的原子類

  • ConcurrentHashMap 等線程安全的集合

協程中的並發與線程的並發大部分是相同的,所以本篇文章應該是目前為止該系列文章中最容易理解的一篇,本系列 Kotlin Coroutines(協程) 完全解析 暫時就到這裡,後面待 select 表達式、Channel、Actor 等實驗性內容正式發佈後繼續解析,還有在 Android 項目中協程的實際運用,敬請期待。

原文 : Johnny Shieh

相關閱讀

免责声明:本文内容来源于Johnny Shieh,已注明原文出处和链接,文章观点不代表立场,如若侵犯到您的权益,或涉不实谣言,敬请向我们提出检举。