Linux中國

Go 並發編程中的經驗教訓

在複雜的分散式系統進行任務處理時,你通常會需要進行並發的操作。在 Mode.net 公司,我們每天都要和實時、快速和靈活的軟體打交道。而沒有一個高度並發的系統,就不可能構建一個毫秒級的動態地路由數據包的全球專用網路。這個動態路由是基於網路狀態的,儘管這個過程需要考慮眾多因素,但我們的重點是鏈路指標。在我們的環境中,鏈路指標可以是任何跟網路鏈接的狀態和當前屬性(如鏈接延遲)有關的任何內容。

並發探測鏈接監控

我們的動態路由演算法 H.A.L.O. 逐跳自適應鏈路狀態最佳路由 Hop-by-Hop Adaptive Link-State Optimal Routing )部分依賴於鏈路指標來計算路由表。這些指標由位於每個 PoP( 存活節點 Point of Presence )上的獨立組件收集。PoP 是表示我們的網路中單個路由實體的機器,通過鏈路連接並分布在我們的網路拓撲中的各個位置。某個組件使用網路數據包探測周圍的機器,周圍的機器回複數據包給前者。從接收到的探測包中可以獲得鏈路延遲。由於每個 PoP 都有不止一個臨近節點,所以這種探測任務實質上是並發的:我們需要實時測量每個臨近連接點的延遲。我們不能串列地處理;為了計算這個指標,必須儘快處理每個探測。

![latency computation graph](/data/attachment/album/202001/13/151444uzg0en5bhbet0hyb.png "latency computation graph")

序列號和重置:一個重新排列場景

我們的探測組件互相發送和接收數據包,並依靠序列號進行數據包處理。這旨在避免處理重複的包或順序被打亂的包。我們的第一個實現依靠特殊的序列號 0 來重置序列號。這個數字僅在組件初始化時使用。主要的問題是我們考慮了遞增的序列號總是從 0 開始。在該組件重啟後,包的順序可能會重新排列,某個包的序列號可能會輕易地被替換成重置之前使用過的值。這意味著,後繼的包都會被忽略掉,直到排到重置之前用到的序列值。

UDP 握手和有限狀態機

這裡的問題是該組件重啟前後的序列號是否一致。有幾種方法可以解決這個問題,經過討論,我們選擇了實現一個帶有清晰狀態定義的三步握手協議。這個握手過程在初始化時通過鏈接建立會話。這樣可以確保節點通過同一個會話進行通信且使用了適當的序列號。

為了正確實現這個過程,我們必須定義一個有清晰狀態和過渡的有限狀態機。這樣我們就可以正確管理握手過程中的所有極端情況。

![finite state machine diagram](/data/attachment/album/202001/13/151307dqmm9hm5yppcxpcq.png "finite state machine diagram")

會話 ID 由握手的初始化程序生成。一個完整的交換順序如下:

  1. 發送者發送一個 SYN(ID) 數據包。
  2. 接收者存儲接收到的 ID 並發送一個 SYN-ACK(ID)
  3. 發送者接收到 SYN-ACK(ID) 並發送一個 ACK(ID)。它還發送一個從序列號 0 開始的數據包。
  4. 接收者檢查最後接收到的 ID,如果 ID 匹配,則接受 ACK(ID)。它還開始接受序列號為 0 的數據包。

處理狀態超時

基本上,每種狀態下你都需要處理最多三種類型的事件:鏈接事件、數據包事件和超時事件。這些事件會並發地出現,因此你必須正確處理並發。

  • 鏈接事件包括網路連接或網路斷開的變化,相應的初始化一個鏈接會話或斷開一個已建立的會話。
  • 數據包事件是控制數據包(SYN/SYN-ACK/ACK)或只是探測響應。
  • 超時事件在當前會話狀態的預定超時時間到期後觸發。

這裡面臨的最主要的問題是如何處理並發的超時到期和其他事件。這裡很容易陷入死鎖和資源競爭的陷阱。

第一種方法

本項目使用的語言是 Golang。它確實提供了原生的同步機制,如自帶的通道和鎖,並且能夠使用輕量級線程來進行並發處理。

![gophers hacking together](/data/attachment/album/202001/13/151314tx43zcpl3rrg1fta.png "gophers hacking together")

gopher 們聚眾狂歡

首先,你可以設計兩個分別表示我們的會話和超時處理程序的結構體。

type Session struct {  
  State SessionState  
  Id SessionId  
  RemoteIp string  
}

type TimeoutHandler struct {  
  callback func(Session)  
  session Session  
  duration int  
  timer *timer.Timer  
}

Session 標識連接會話,內有表示會話 ID、臨近的連接點的 IP 和當前會話狀態的欄位。

TimeoutHandler 包含回調函數、對應的會話、持續時間和指向調度計時器的指針。

每一個臨近連接點的會話都包含一個保存調度 TimeoutHandler 的全局映射。

SessionTimeout map[Session]*TimeoutHandler

下面方法註冊和取消超時:

// schedules the timeout callback function.  
func (timeout* TimeoutHandler) Register() {  
  timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time.Second, func() {  
    timeout.callback(timeout.session)  
  })  
}

func (timeout* TimeoutHandler) Cancel() {  
  if timeout.timer == nil {  
    return  
  }  
  timeout.timer.Stop()  
}

你可以使用類似下面的方法來創建和存儲超時:

func CreateTimeoutHandler(callback func(Session), session Session, duration int) *TimeoutHandler {  
  if sessionTimeout[session] == nil {  
    sessionTimeout[session] := new(TimeoutHandler)  
  }  

  timeout = sessionTimeout[session]  
  timeout.session = session  
  timeout.callback = callback  
  timeout.duration = duration  
  return timeout  
}

超時處理程序創建後,會在經過了設置的 duration 時間(秒)後執行回調函數。然而,有些事件會使你重新調度一個超時處理程序(與 SYN 狀態時的處理一樣,每 3 秒一次)。

為此,你可以讓回調函數重新調度一次超時:

func synCallback(session Session) {  
  sendSynPacket(session)

  // reschedules the same callback.  
  newTimeout := NewTimeoutHandler(synCallback, session, SYN_TIMEOUT_DURATION)  
  newTimeout.Register()

  sessionTimeout[state] = newTimeout  
}

這次回調在新的超時處理程序中重新調度自己,並更新全局映射 sessionTimeout

數據競爭和引用

你的解決方案已經有了。可以通過檢查計時器到期後超時回調是否執行來進行一個簡單的測試。為此,註冊一個超時,休眠 duration 秒,然後檢查是否執行了回調的處理。執行這個測試後,最好取消預定的超時時間(因為它會重新調度),這樣才不會在下次測試時產生副作用。

令人驚訝的是,這個簡單的測試發現了這個解決方案中的一個問題。使用 cancel 方法來取消超時並沒有正確處理。以下順序的事件會導致數據資源競爭:

  1. 你有一個已調度的超時處理程序。
  2. 線程 1:
    1. 你接收到一個控制數據包,現在你要取消已註冊的超時並切換到下一個會話狀態(如發送 SYN 後接收到一個 SYN-ACK
    2. 你調用了 timeout.Cancel(),這個函數調用了 timer.Stop()。(請注意,Golang 計時器的停止不會終止一個已過期的計時器。)
  3. 線程 2:
    1. 在取消調用之前,計時器已過期,回調即將執行。
    2. 執行回調,它調度一次新的超時並更新全局映射。
  4. 線程 1:
    1. 切換到新的會話狀態並註冊新的超時,更新全局映射。

兩個線程並發地更新超時映射。最終結果是你無法取消註冊的超時,然後你也會丟失對線程 2 重新調度的超時的引用。這導致處理程序在一段時間內持續執行和重新調度,出現非預期行為。

鎖也解決不了問題

使用鎖也不能完全解決問題。如果你在處理所有事件和執行回調之前加鎖,它仍然不能阻止一個過期的回調運行:

func (timeout* TimeoutHandler) Register() {  
  timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time._Second_, func() {  
    stateLock.Lock()  
    defer stateLock.Unlock()

    timeout.callback(timeout.session)  
  })  
}

現在的區別就是全局映射的更新是同步的,但是這還是不能阻止在你調用 timeout.Cancel() 後回調的執行 —— 這種情況出現在調度計時器過期了但是還沒有拿到鎖的時候。你還是會丟失一個已註冊的超時的引用。

使用取消通道

你可以使用取消通道,而不必依賴不能阻止到期的計時器執行的 golang 函數 timer.Stop()

這是一個略有不同的方法。現在你可以不用再通過回調進行遞歸地重新調度;而是註冊一個死循環,這個循環接收到取消信號或超時事件時終止。

新的 Register() 產生一個新的 go 線程,這個線程在超時後執行你的回調,並在前一個超時執行後調度新的超時。返回給調用方一個取消通道,用來控制循環的終止。

func (timeout *TimeoutHandler) Register() chan struct{} {  
  cancelChan := make(chan struct{})  

  go func () {  
    select {  
    case _ = <- cancelChan:  
      return  
    case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):  
      func () {  
        stateLock.Lock()  
        defer stateLock.Unlock()

        timeout.callback(timeout.session)  
      } ()  
    }  
  } ()

  return cancelChan  
}

func (timeout* TimeoutHandler) Cancel() {  
  if timeout.cancelChan == nil {  
    return  
  }  
  timeout.cancelChan <- struct{}{}  
}

這個方法給你註冊的所有超時提供了取消通道。一個取消調用向通道發送一個空結構體並觸發取消操作。然而,這並不能解決前面的問題;可能在你通過通道取消之前以及超時線程拿到鎖之前,超時時間就已經到了。

這裡的解決方案是,在拿到鎖之後,檢查一下超時範圍內的取消通道。

  case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):  
    func () {  
      stateLock.Lock()  
      defer stateLock.Unlock()  

      select {  
      case _ = <- handler.cancelChan:  
        return  
      default:  
        timeout.callback(timeout.session)  
      }  
    } ()  
  }

最終,這可以確保在拿到鎖之後執行回調,不會觸發取消操作。

小心死鎖

這個解決方案看起來有效;但是還是有個隱患:死鎖

請閱讀上面的代碼,試著自己找到它。考慮下描述的所有函數的並發調用。

這裡的問題在取消通道本身。我們創建的是無緩衝通道,即發送的是阻塞調用。當你在一個超時處理程序中調用取消函數時,只有在該處理程序被取消後才能繼續處理。問題出現在,當你有多個調用請求到同一個取消通道時,這時一個取消請求只被處理一次。當多個事件同時取消同一個超時處理程序時,如連接斷開或控制包事件,很容易出現這種情況。這會導致死鎖,可能會使應用程序停機。

![gophers on a wire, talking](/data/attachment/album/202001/13/151316k1dj066zm1y3yq0f.jpg "gophers on a wire, talking")

有人在聽嗎?

(已獲得 Trevor Forrey 授權。)

這裡的解決方案是創建通道時指定緩存大小至少為 1,這樣向通道發送數據就不會阻塞,也顯式地使發送變成非阻塞的,避免了並發調用。這樣可以確保取消操作只發送一次,並且不會阻塞後續的取消調用。

func (timeout* TimeoutHandler) Cancel() {  
  if timeout.cancelChan == nil {  
    return  
  }  

  select {  
  case timeout.cancelChan <- struct{}{}:  
  default:  
    // can』t send on the channel, someone has already requested the cancellation.  
  }  
}

總結

在實踐中你學到了並發操作時出現的常見錯誤。由於其不確定性,即使進行大量的測試,也不容易發現這些問題。下面是我們在最初的實現中遇到的三個主要問題:

在非同步的情況下更新共享數據

這似乎是個很明顯的問題,但如果並發更新發生在不同的位置,就很難發現。結果就是數據競爭,由於一個更新會覆蓋另一個,因此對同一數據的多次更新中會有某些更新丟失。在我們的案例中,我們是在同時更新同一個共享映射里的調度超時引用。(有趣的是,如果 Go 檢測到在同一個映射對象上的並發讀寫,會拋出致命錯誤 — 你可以嘗試下運行 Go 的數據競爭檢測器)。這最終會導致丟失超時引用,且無法取消給定的超時。當有必要時,永遠不要忘記使用鎖。

![gopher assembly line](/data/attachment/album/202001/13/151317woonkapqz7punqoz.jpg "gopher assembly line")

不要忘記同步 gopher 們的工作

缺少條件檢查

在不能僅依賴鎖的獨佔性的情況下,就需要進行條件檢查。我們遇到的場景稍微有點不一樣,但是核心思想跟條件變數是一樣的。假設有個一個生產者和多個消費者使用一個共享隊列的經典場景,生產者可以將一個元素添加到隊列並喚醒所有消費者。這個喚醒調用意味著隊列中的數據是可訪問的,並且由於隊列是共享的,消費者必須通過鎖來進行同步訪問。每個消費者都可能拿到鎖;然而,你仍然需要檢查隊列中是否有元素。因為在你拿到鎖的瞬間並不知道隊列的狀態,所以還是需要進行條件檢查。

在我們的例子中,超時處理程序收到了計時器到期時發出的「喚醒」調用,但是它仍需要檢查是否已向其發送了取消信號,然後才能繼續執行回調。

![gopher boot camp](/data/attachment/album/202001/13/151320iku72a0b8582lzyu.png "gopher boot camp")

如果你要喚醒多個 gopher,可能就需要進行條件檢查

死鎖

當一個線程被卡住,無限期地等待一個喚醒信號,但是這個信號永遠不會到達時,就會發生這種情況。死鎖可以通過讓你的整個程序停機來徹底殺死你的應用。

在我們的案例中,這種情況的發生是由於多次發送請求到一個非緩衝且阻塞的通道。這意味著向通道發送數據只有在從這個通道接收完數據後才能返回。我們的超時線程循環迅速從取消通道接收信號;然而,在接收到第一個信號後,它將跳出循環,並且再也不會從這個通道讀取數據。其他的調用會一直被卡住。為避免這種情況,你需要仔細檢查代碼,謹慎處理阻塞調用,並確保不會發生線程飢餓。我們例子中的解決方法是使取消調用成為非阻塞調用 — 我們不需要阻塞調用。

via: https://opensource.com/article/19/12/go-common-pitfalls

作者:Eduardo Ferreira 選題:lujun9972 譯者:lxbwolf 校對:wxy

本文由 LCTT 原創編譯,Linux中國 榮譽推出


本文轉載來自 Linux 中國: https://github.com/Linux-CN/archive

對這篇文章感覺如何?

太棒了
0
不錯
0
愛死了
0
不太好
0
感覺很糟
0
雨落清風。心向陽

    You may also like

    Leave a reply

    您的郵箱地址不會被公開。 必填項已用 * 標註

    此站點使用Akismet來減少垃圾評論。了解我們如何處理您的評論數據

    More in:Linux中國