Linux中國

一些常見的並發編程錯誤

Go 是一個內置支持並發編程的語言。藉助使用 go 關鍵字去創建 協程 goroutine (輕量級線程)和在 Go 中提供的 使用 信道其它的並發 同步方法,使得並發編程變得很容易、很靈活和很有趣。

另一方面,Go 並不會阻止一些因 Go 程序員粗心大意或者缺乏經驗而造成的並發編程錯誤。在本文的下面部分將展示一些在 Go 編程中常見的並發編程錯誤,以幫助 Go 程序員們避免再犯類似的錯誤。

需要同步的時候沒有同步

代碼行或許 不是按出現的順序運行的

在下面的程序中有兩個錯誤。

  • 第一,在 main 協程中讀取 b 和在新的 協程 中寫入 b 可能導致數據爭用。
  • 第二,條件 b == true 並不能保證在 main 協程 中的 a != nil。在新的協程中編譯器和 CPU 可能會通過 重排序指令 進行優化,因此,在運行時 b 賦值可能發生在 a 賦值之前,在 main 協程 中當 a 被修改後,它將會讓部分 a 一直保持為 nil
package main

import (
    "time"
    "runtime"
)

func main() {
    var a []int // nil
    var b bool  // false

    // a new goroutine
    go func () {
        a = make([]int, 3)
        b = true // write b
    }()

    for !b { // read b
        time.Sleep(time.Second)
        runtime.Gosched()
    }
    a[0], a[1], a[2] = 0, 1, 2 // might panic
}

上面的程序或者在一台計算機上運行的很好,但是在另一台上可能會引發異常。或者它可能運行了 N 次都很好,但是可能在第 (N+1) 次引發了異常。

我們將使用 sync 標準包中提供的信道或者同步方法去確保內存中的順序。例如,

package main

func main() {
    var a []int = nil
    c := make(chan struct{})

    // a new goroutine
    go func () {
        a = make([]int, 3)
        c <- struct{}{}
    }()

    <-c
    a[0], a[1], a[2] = 0, 1, 2
}

使用 time.Sleep 調用去做同步

我們先來看一個簡單的例子。

package main

import (
    "fmt"
    "time"
)

func main() {
    var x = 123

    go func() {
        x = 789 // write x
    }()

    time.Sleep(time.Second)
    fmt.Println(x) // read x
}

我們預期程序將列印出 789。如果我們運行它,通常情況下,它確定列印的是 789。但是,這個程序使用的同步方式好嗎?No!原因是 Go 運行時並不保證 x 的寫入一定會發生在 x 的讀取之前。在某些條件下,比如在同一個操作系統上,大部分 CPU 資源被其它運行的程序所佔用的情況下,寫入 x 可能就會發生在讀取 x 之後。這就是為什麼我們在正式的項目中,從來不使用 time.Sleep 調用去實現同步的原因。

我們來看一下另外一個示例。

package main

import (
    "fmt"
    "time"
)

var x = 0

func main() {
    var num = 123
    var p = &num

    c := make(chan int)

    go func() {
        c <- *p + x
    }()

    time.Sleep(time.Second)
    num = 789
    fmt.Println(<-c)
}

你認為程序的預期輸出是什麼?123 還是 789?事實上它的輸出與編譯器有關。對於標準的 Go 編譯器 1.10 來說,這個程序很有可能輸出是 123。但是在理論上,它可能輸出的是 789,或者其它的隨機數。

現在,我們來改變 c <- *p + xc <- *p,然後再次運行這個程序。你將會發現輸出變成了 789 (使用標準的 Go 編譯器 1.10)。這再次說明它的輸出是與編譯器相關的。

是的,在上面的程序中存在數據爭用。表達式 *p 可能會被先計算、後計算、或者在處理賦值語句 num = 789 時計算。time.Sleep 調用並不能保證 *p 發生在賦值語句處理之前進行。

對於這個特定的示例,我們將在新的協程創建之前,將值保存到一個臨時值中,然後在新的協程中使用臨時值去消除數據爭用。

...
    tmp := *p + x
    go func() {
        c <- tmp
    }()
...

使協程掛起

掛起協程是指讓協程一直處於阻塞狀態。導致協程被掛起的原因很多。比如,

  • 一個協程嘗試從一個 nil 信道中或者從一個沒有其它協程給它發送值的信道中檢索數據。
  • 一個協程嘗試去發送一個值到 nil 信道,或者發送到一個沒有其它的協程接收值的信道中。
  • 一個協程被它自己死鎖。
  • 一組協程彼此死鎖。
  • 當運行一個沒有 default 分支的 select 代碼塊時,一個協程被阻塞,以及在 select 代碼塊中 case 關鍵字後的所有信道操作保持阻塞狀態。

除了有時我們為了避免程序退出,特意讓一個程序中的 main 協程保持掛起之外,大多數其它的協程掛起都是意外情況。Go 運行時很難判斷一個協程到底是處於掛起狀態還是臨時阻塞。因此,Go 運行時並不會去釋放一個掛起的協程所佔用的資源。

誰先響應誰獲勝 的信道使用案例中,如果使用的 future 信道容量不夠大,當嘗試向 Future 信道發送結果時,一些響應較慢的信道將被掛起。比如,如果調用下面的函數,將有 4 個協程處於永遠阻塞狀態。

func request() int {
    c := make(chan int)
    for i := 0; i < 5; i++ {
        i := i
        go func() {
            c <- i // 4 goroutines will hang here.
        }()
    }
    return <-c
}

為避免這 4 個協程一直處於掛起狀態, c 信道的容量必須至少是 4

實現誰先響應誰獲勝的第二種方法 的信道使用案例中,如果將 future 信道用做非緩衝信道,那麼有可能這個信息將永遠也不會有響應而掛起。例如,如果在一個協程中調用下面的函數,協程可能會掛起。原因是,如果接收操作 <-c 準備就緒之前,五個發送操作全部嘗試發送,那麼所有的嘗試發送的操作將全部失敗,因此那個調用者協程將永遠也不會接收到值。

func request() int {
    c := make(chan int)
    for i := 0; i < 5; i++ {
        i := i
        go func() {
            select {
            case c <- i:
            default:
            }
        }()
    }
    return <-c
}

將信道 c 變成緩衝信道將保證五個發送操作中的至少一個操作會發送成功,這樣,上面函數中的那個調用者協程將不會被掛起。

sync 標準包中拷貝類型值

在實踐中,sync 標準包中的類型值不會被拷貝。我們應該只拷貝這個值的指針。

下面是一個錯誤的並發編程示例。在這個示例中,當調用 Counter.Value 方法時,將拷貝一個 Counter 接收值。作為接收值的一個欄位,Counter 接收值的各個 Mutex 欄位也會被拷貝。拷貝不是同步發生的,因此,拷貝的 Mutex 值可能會出錯。即便是沒有錯誤,拷貝的 Counter 接收值的訪問保護也是沒有意義的。

import "sync"

type Counter struct {
    sync.Mutex
    n int64
}

// This method is okay.
func (c *Counter) Increase(d int64) (r int64) {
    c.Lock()
    c.n += d
    r = c.n
    c.Unlock()
    return
}

// The method is bad. When it is called, a Counter
// receiver value will be copied.
func (c Counter) Value() (r int64) {
    c.Lock()
    r = c.n
    c.Unlock()
    return
}

我們只需要改變 Value 接收類型方法為指針類型 *Counter,就可以避免拷貝 Mutex 值。

在官方的 Go SDK 中提供的 go vet 命令將會報告潛在的錯誤值拷貝。

在錯誤的地方調用 sync.WaitGroup 的方法

每個 sync.WaitGroup 值維護一個內部計數器,這個計數器的初始值為 0。如果一個 WaitGroup 計數器的值是 0,調用 WaitGroup 值的 Wait 方法就不會被阻塞,否則,在計數器值為 0 之前,這個調用會一直被阻塞。

為了讓 WaitGroup 值的使用有意義,當一個 WaitGroup 計數器值為 0 時,必須在相應的 WaitGroup 值的 Wait 方法調用之前,去調用 WaitGroup 值的 Add 方法。

例如,下面的程序中,在不正確位置調用了 Add 方法,這將使最後列印出的數字不總是 100。事實上,這個程序最後列印的數字可能是在 [0, 100) 範圍內的一個隨意數字。原因就是 Add 方法的調用並不保證一定會發生在 Wait 方法調用之前。

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {
    var wg sync.WaitGroup
    var x int32 = 0
    for i := 0; i < 100; i++ {
        go func() {
            wg.Add(1)
            atomic.AddInt32(&x, 1)
            wg.Done()
        }()
    }

    fmt.Println("To wait ...")
    wg.Wait()
    fmt.Println(atomic.LoadInt32(&x))
}

為讓程序的表現符合預期,在 for 循環中,我們將把 Add 方法的調用移動到創建的新協程的範圍之外,修改後的代碼如下。

...
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            atomic.AddInt32(&x, 1)
            wg.Done()
        }()
    }
...

不正確使用 futures 信道

信道使用案例 的文章中,我們知道一些函數將返回 futures 信道。假設 fafb 就是這樣的兩個函數,那麼下面的調用就使用了不正確的 future 參數。

doSomethingWithFutureArguments(<-fa(), <-fb())

在上面的代碼行中,兩個信道接收操作是順序進行的,而不是並發的。我們做如下修改使它變成並發操作。

ca, cb := fa(), fb()
doSomethingWithFutureArguments(<-c1, <-c2)

沒有等協程的最後的活動的發送結束就關閉信道

Go 程序員經常犯的一個錯誤是,還有一些其它的協程可能會發送值到以前的信道時,這個信道就已經被關閉了。當這樣的發送(發送到一個已經關閉的信道)真實發生時,將引發一個異常。

這種錯誤在一些以往的著名 Go 項目中也有發生,比如在 Kubernetes 項目中的 這個 bug這個 bug

如何安全和優雅地關閉信道,請閱讀 這篇文章

在值上做 64 位原子操作時沒有保證值地址 64 位對齊

到目前為止(Go 1.10),在標準的 Go 編譯器中,在一個 64 位原子操作中涉及到的值的地址要求必須是 64 位對齊的。如果沒有對齊則導致當前的協程異常。對於標準的 Go 編譯器來說,這種失敗僅發生在 32 位的架構上。請閱讀 內存布局 去了解如何在一個 32 位操作系統上保證 64 位對齊。

沒有注意到大量的資源被 time.After 函數調用佔用

time 標準包中的 After 函數返回 一個延遲通知的信道。這個函數在某些情況下用起來很便捷,但是,每次調用它將創建一個 time.Timer 類型的新值。這個新創建的 Timer 值在通過傳遞參數到 After 函數指定期間保持激活狀態,如果在這個期間過多的調用了該函數,可能會有太多的 Timer 值保持激活,這將佔用大量的內存和計算資源。

例如,如果調用了下列的 longRunning 函數,將在一分鐘內產生大量的消息,然後在某些周期內將有大量的 Timer 值保持激活,即便是大量的這些 Timer 值已經沒用了也是如此。

import (
    "fmt"
    "time"
)

// The function will return if a message arrival interval
// is larger than one minute.
func longRunning(messages <-chan string) {
    for {
        select {
        case <-time.After(time.Minute):
            return
        case msg := <-messages:
            fmt.Println(msg)
        }
    }
}

為避免在上述代碼中創建過多的 Timer 值,我們將使用一個單一的 Timer 值去完成同樣的任務。

func longRunning(messages <-chan string) {
    timer := time.NewTimer(time.Minute)
    defer timer.Stop()

    for {
        select {
        case <-timer.C:
            return
        case msg := <-messages:
            fmt.Println(msg)
            if !timer.Stop() {
                <-timer.C
            }
        }

        // The above "if" block can also be put here.

        timer.Reset(time.Minute)
    }
}

不正確地使用 time.Timer

在最後,我們將展示一個符合語言使用習慣的 time.Timer 值的使用示例。需要注意的一個細節是,那個 Reset 方法總是在停止或者 time.Timer 值釋放時被使用。

select 塊的第一個 case 分支的結束部分,time.Timer 值被釋放,因此,我們不需要去停止它。但是必須在第二個分支中停止定時器。如果在第二個分支中 if 代碼塊缺失,它可能至少在 Reset 方法調用時,會(通過 Go 運行時)發送到 timer.C 信道,並且那個 longRunning 函數可能會早於預期返回,對於 Reset 方法來說,它可能僅僅是重置內部定時器為 0,它將不會清理(耗盡)那個發送到 timer.C 信道的值。

例如,下面的程序很有可能在一秒內而不是十秒時退出。並且更重要的是,這個程序並不是 DRF 的(LCTT 譯註:data race free,多線程程序的一種同步程度)。

package main

import (
    "fmt"
    "time"
)

func main() {
    start := time.Now()
    timer := time.NewTimer(time.Second/2)
    select {
    case <-timer.C:
    default:
        time.Sleep(time.Second) // go here
    }
    timer.Reset(time.Second * 10)
    <-timer.C
    fmt.Println(time.Since(start)) // 1.000188181s
}

time.Timer 的值不再被其它任何一個東西使用時,它的值可能被停留在一種非停止狀態,但是,建議在結束時停止它。

在多個協程中如果不按建議使用 time.Timer 值並發,可能會有 bug 隱患。

我們不應該依賴一個 Reset 方法調用的返回值。Reset 方法返回值的存在僅僅是為了兼容性目的。

via: https://go101.org/article/concurrent-common-mistakes.html

作者: 譯者:qhwdw 校對:wxy

本文由 LCTT 原創編譯,Linux中國 榮譽推出


本文轉載來自 Linux 中國: https://github.com/Linux-CN/archive

對這篇文章感覺如何?

太棒了
0
不錯
0
愛死了
0
不太好
0
感覺很糟
0
雨落清風。心向陽

    You may also like

    Leave a reply

    您的郵箱地址不會被公開。 必填項已用 * 標註

    此站點使用Akismet來減少垃圾評論。了解我們如何處理您的評論數據

    More in:Linux中國