Linux中國

Streams:一個新的 Redis 通用數據結構

直到幾個月以前,對於我來說,在消息傳遞的環境中, streams 只是一個有趣且相對簡單的概念。這個概念在 Kafka 流行之後,我主要研究它們在 Disque 案例中的應用,Disque 是一個消息隊列,它將在 Redis 4.2 中被轉換為 Redis 的一個模塊。後來我決定讓 Disque 都用 AP 消息(LCTT 譯註:參見 CAP 定理),也就是說,它將在不需要客戶端過多參與的情況下實現容錯和可用性,這樣一來,我更加確定地認為流的概念在那種情況下並不適用。

然而在那時 Redis 有個問題,那就是預設情況下導出數據結構並不輕鬆。它在 Redis 列表 list 有序集 sorted list 發布/訂閱 Pub/Sub 功能之間有某些缺陷。你可以權衡使用這些工具對一系列消息或事件建模。

有序集是內存消耗大戶,那自然就不能對投遞的相同消息進行一次又一次的建模,客戶端不能阻塞新消息。因為有序集並不是一個序列化的數據結構,它是一個元素可以根據它們量的變化而移動的集合:所以它不像時序性的數據那樣。

列表有另外的問題,它在某些特定的用例中會產生類似的適用性問題:你無法瀏覽列表中間的內容,因為在那種情況下,訪問時間是線性的。此外,沒有任何指定輸出的功能,列表上的阻塞操作僅為單個客戶端提供單個元素。列表中沒有固定的元素標識,也就是說,不能指定從哪個元素開始給我提供內容。

對於一對多的工作任務,有發布/訂閱機制,它在大多數情況下是非常好的,但是,對於某些不想 「即發即棄」 fire-and-forget 的東西:保留一個歷史是很重要的,不只是因為是斷開之後會重新獲得消息,也因為某些如時序性的消息列表,用範圍查詢瀏覽是非常重要的:比如在這 10 秒範圍內溫度讀數是多少?

我試圖解決上述問題,我想規劃一個通用的有序集合,並列入一個獨特的、更靈活的數據結構,然而,我的設計嘗試最終以生成一個比當前的數據結構更加矯揉造作的結果而告終。Redis 有個好處,它的數據結構導出更像自然的計算機科學的數據結構,而不是 「Salvatore 發明的 API」。因此,我最終停止了我的嘗試,並且說,「ok,這是我們目前能提供的」,或許我會為發布/訂閱增加一些歷史信息,或者為列表訪問增加一些更靈活的方式。然而,每次在會議上有用戶對我說 「你如何在 Redis 中模擬時間系列」 或者類似的問題時,我的臉就綠了。

起源

在 Redis 4.0 中引入模塊之後,用戶開始考慮他們自己怎麼去修復這些問題。其中一個用戶 Timothy Downs 通過 IRC 和我說道:

<forkfork> 我計劃給這個模塊增加一個事務日誌式的數據類型 —— 這意味著大量的訂閱者可以在不導致 redis 內存激增的情況下做一些像發布/訂閱那樣的事情
<forkfork> 訂閱者持有他們在消息隊列中的位置,而不是讓 Redis 必須維護每個消費者的位置和為每個訂閱者複製消息

他的思路啟發了我。我想了幾天,並且意識到這可能是我們馬上同時解決上面所有問題的契機。我需要去重新構思 「日誌」 的概念是什麼。日誌是個基本的編程元素,每個人都使用過它,因為它只是簡單地以追加模式打開一個文件,並以一定的格式寫入數據。然而 Redis 數據結構必須是抽象的。它們在內存中,並且我們使用內存並不是因為我們懶,而是因為使用一些指針,我們可以概念化數據結構並把它們抽象,以使它們擺脫明確的限制。例如,一般來說日誌有幾個問題:偏移不是邏輯化的,而是真實的位元組偏移,如果你想要與條目插入的時間相關的邏輯偏移應該怎麼辦?我們有範圍查詢可用。同樣,日誌通常很難進行垃圾回收:在一個只能進行追加操作的數據結構中怎麼去刪除舊的元素?好吧,在我們理想的日誌中,我們只需要說,我想要數字最大的那個條目,而舊的元素一個也不要,等等。

當我從 Timothy 的想法中受到啟發,去嘗試著寫一個規範的時候,我使用了 Redis 集群中的 radix 樹去實現,優化了它內部的某些部分。這為實現一個有效利用空間的日誌提供了基礎,而且仍然有可能在 對數時間 logarithmic time 內訪問範圍。同時,我開始去讀關於 Kafka 的流相關的內容以獲得另外的靈感,它也非常適合我的設計,最後借鑒了 Kafka 消費組 consumer groups 的概念,並且再次針對 Redis 進行優化,以適用於 Redis 在內存中使用的情況。然而,該規範僅停留在紙面上,在一段時間後我幾乎把它從頭到尾重寫了一遍,以便將我與別人討論的所得到的許多建議一起增加到 Redis 升級中。我希望 Redis 流能成為對於時間序列有用的特性,而不僅是一個常見的事件和消息類的應用程序。

讓我們寫一些代碼吧

從 Redis 大會回來後,整個夏天我都在實現一個叫 listpack 的庫。這個庫是 ziplist.c 的繼任者,那是一個表示在單個分配中的字元串元素列表的數據結構。它是一個非常特殊的序列化格式,其特點在於也能夠以逆序(從右到左)解析:以便在各種用例中替代 ziplists。

結合 radix 樹和 listpacks 的特性,它可以很容易地去構建一個空間高效的日誌,並且還是可索引的,這意味著允許通過 ID 和時間進行隨機訪問。自從這些就緒後,我開始去寫一些代碼以實現流數據結構。我還在完成這個實現,不管怎樣,現在在 Github 上的 Redis 的 streams 分支里它已經可以跑起來了。我並沒有聲稱那個 API 是 100% 的最終版本,但是,這有兩個有意思的事實:一,在那時只有消費群組是缺失的,加上一些不太重要的操作流的命令,但是,所有的大的方面都已經實現了。二,一旦各個方面比較穩定了之後,我決定大概用兩個月的時間將所有的流的特性 向後移植 backport 到 4.0 分支。這意味著 Redis 用戶想要使用流,不用等待 Redis 4.2 發布,它們在生產環境馬上就可用了。這是可能的,因為作為一個新的數據結構,幾乎所有的代碼改變都出現在新的代碼裡面。除了阻塞列表操作之外:該代碼被重構了,我們對於流和列表阻塞操作共享了相同的代碼,而極大地簡化了 Redis 內部實現。

教程:歡迎使用 Redis 的 streams

在某些方面,你可以認為流是 Redis 列表的一個增強版本。流元素不再是一個單一的字元串,而是一個 欄位 field value 組成的對象。範圍查詢更適用而且更快。在流中,每個條目都有一個 ID,它是一個邏輯偏移量。不同的客戶端可以 阻塞等待 blocking-wait 比指定的 ID 更大的元素。Redis 流的一個基本的命令是 XADD。是的,所有的 Redis 流命令都是以一個 X 為前綴的。

> XADD mystream * sensor-id 1234 temperature 10.5
1506871964177.0

這個 XADD 命令將追加指定的條目作為一個指定的流 —— 「mystream」 的新元素。上面示例中的這個條目有兩個欄位:sensor-idtemperature,每個條目在同一個流中可以有不同的欄位。使用相同的欄位名可以更好地利用內存。有意思的是,欄位的排序是可以保證順序的。XADD 僅返回插入的條目的 ID,因為在第三個參數中是星號(*),表示由命令自動生成 ID。通常這樣做就夠了,但是也可以去強制指定一個 ID,這種情況用於複製這個命令到 從伺服器 slave server AOF append-only file 文件。

這個 ID 是由兩部分組成的:一個毫秒時間和一個序列號。1506871964177 是毫秒時間,它只是一個毫秒級的 UNIX 時間戳。圓點(.)後面的數字 0 是一個序號,它是為了區分相同毫秒數的條目增加上去的。這兩個數字都是 64 位的無符號整數。這意味著,我們可以在流中增加所有想要的條目,即使是在同一毫秒中。ID 的毫秒部分使用 Redis 伺服器的當前本地時間生成的 ID 和流中的最後一個條目 ID 兩者間的最大的一個。因此,舉例來說,即使是計算機時間回跳,這個 ID 仍然是增加的。在某些情況下,你可以認為流條目的 ID 是完整的 128 位數字。然而,事實上它們與被添加到的實例的本地時間有關,這意味著我們可以在毫秒級的精度的範圍隨意查詢。

正如你想的那樣,快速添加兩個條目後,結果是僅一個序號遞增了。我們可以用一個 MULTI/EXEC 塊來簡單模擬「快速插入」:

> MULTI
OK
> XADD mystream * foo 10
QUEUED
> XADD mystream * bar 20
QUEUED
> EXEC
1) 1506872463535.0
2) 1506872463535.1

在上面的示例中,也展示了無需指定任何初始 模式 schema 的情況下,對不同的條目使用不同的欄位。會發生什麼呢?就像前面提到的一樣,只有每個塊(它通常包含 50-150 個消息內容)的第一個消息被使用。並且,相同欄位的連續條目都使用了一個標誌進行了壓縮,這個標誌表示與「它們與這個塊中的第一個條目的欄位相同」。因此,使用相同欄位的連續消息可以節省許多內存,即使是欄位集隨著時間發生緩慢變化的情況下也很節省內存。

為了從流中檢索數據,這裡有兩種方法:範圍查詢,它是通過 XRANGE 命令實現的; 流播 streaming ,它是通過 XREAD 命令實現的。XRANGE 命令僅取得包括從開始到停止範圍內的全部條目。因此,舉例來說,如果我知道它的 ID,我可以使用如下的命名取得單個條目:

> XRANGE mystream 1506871964177.0 1506871964177.0
1) 1) 1506871964177.0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "10.5"

不管怎樣,你都可以使用指定的開始符號 - 和停止符號 + 表示最小和最大的 ID。為了限制返回條目的數量,也可以使用 COUNT 選項。下面是一個更複雜的 XRANGE 示例:

> XRANGE mystream - + COUNT 2
1) 1) 1506871964177.0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "10.5"
2) 1) 1506872463535.0
   2) 1) "foo"
      2) "10"

這裡我們講的是 ID 的範圍,然後,為了取得在一個給定時間範圍內的特定範圍的元素,你可以使用 XRANGE,因為 ID 的「序號」 部分可以省略。因此,你可以只指定「毫秒」時間即可,下面的命令的意思是:「從 UNIX 時間 1506872463 開始給我 10 個條目」:

127.0.0.1:6379> XRANGE mystream 1506872463000 + COUNT 10
1) 1) 1506872463535.0
   2) 1) "foo"
      2) "10"
2) 1) 1506872463535.1
   2) 1) "bar"
      2) "20"

關於 XRANGE 需要注意的最重要的事情是,假設我們在回復中收到 ID,隨後連續的 ID 只是增加了序號部分,所以可以使用 XRANGE 遍歷整個流,接收每個調用的指定個數的元素。Redis 中的*SCAN 系列命令允許迭代 Redis 數據結構,儘管事實上它們不是為迭代設計的,但這樣可以避免再犯相同的錯誤。

使用 XREAD 處理流播:阻塞新的數據

當我們想通過 ID 或時間去訪問流中的一個範圍或者是通過 ID 去獲取單個元素時,使用 XRANGE 是非常完美的。然而,在使用流的案例中,當數據到達時,它必須由不同的客戶端來消費時,這就不是一個很好的解決方案,這需要某種形式的 匯聚池 pooling 。(對於 某些 應用程序來說,這可能是個好主意,因為它們僅是偶爾連接查詢的)

XREAD 命令是為讀取設計的,在同一個時間,從多個流中僅指定我們從該流中得到的最後條目的 ID。此外,如果沒有數據可用,我們可以要求阻塞,當數據到達時,就解除阻塞。類似於阻塞列表操作產生的效果,但是這裡並沒有消費從流中得到的數據,並且多個客戶端可以同時訪問同一份數據。

這裡有一個典型的 XREAD 調用示例:

> XREAD BLOCK 5000 STREAMS mystream otherstream $ $

它的意思是:從 mystreamotherstream 取得數據。如果沒有數據可用,阻塞客戶端 5000 毫秒。在 STREAMS 選項之後指定我們想要監聽的關鍵字,最後的是指定想要監聽的 ID,指定的 ID 為 $ 的意思是:假設我現在需要流中的所有元素,因此,只需要從下一個到達的元素開始給我。

如果我從另一個客戶端發送這樣的命令:

> XADD otherstream * message 「Hi There」

XREAD 側會出現什麼情況呢?

1) 1) "otherstream"
   2) 1) 1) 1506935385635.0
         2) 1) "message"
            2) "Hi There"

與收到的數據一起,我們也得到了數據的關鍵字。在下次調用中,我們將使用接收到的最新消息的 ID:

> XREAD BLOCK 5000 STREAMS mystream otherstream $ 1506935385635.0

依次類推。然而需要注意的是使用方式,客戶端有可能在一個非常大的延遲之後再次連接(因為它處理消息需要時間,或者其它什麼原因)。在這種情況下,期間會有很多消息堆積,為了確保客戶端不被消息淹沒,以及伺服器不會因為給單個客戶端提供大量消息而浪費太多的時間,使用 XREADCOUNT 選項是非常明智的。

流封頂

目前看起來還不錯……然而,有些時候,流需要刪除一些舊的消息。幸運的是,這可以使用 XADD 命令的 MAXLEN 選項去做:

> XADD mystream MAXLEN 1000000 * field1 value1 field2 value2

它是基本意思是,如果在流中添加新元素後發現消息數量超過了 1000000 個,那麼就刪除舊的消息,以便於元素總量重新回到 1000000 以內。它很像是在列表中使用的 RPUSH + LTRIM,但是,這次我們是使用了一個內置機制去完成的。然而,需要注意的是,上面的意思是每次我們增加一個新的消息時,我們還需要另外的工作去從流中刪除舊的消息。這將消耗一些 CPU 資源,所以在計算 MAXLEN 之前,儘可能使用 ~ 符號,以表明我們不要求非常 精確 的 1000000 個消息,就是稍微多一些也不是大問題:

> XADD mystream MAXLEN ~ 1000000 * foo bar

這種方式的 XADD 僅當它可以刪除整個節點的時候才會刪除消息。相比普通的 XADD,這種方式幾乎可以自由地對流進行封頂。

消費組(開發中)

這是第一個 Redis 中尚未實現而在開發中的特性。靈感也是來自 Kafka,儘管在這裡是以不同的方式實現的。重點是使用了 XREAD,客戶端也可以增加一個 GROUP <name> 選項。相同組的所有客戶端將自動得到 不同的 消息。當然,同一個流可以被多個組讀取。在這種情況下,所有的組將收到流中到達的消息的相同副本。但是,在每個組內,消息是不會重複的。

當指定組時,能夠指定一個 RETRY <milliseconds> 選項去擴展組:在這種情況下,如果消息沒有通過 XACK 進行確認,它將在指定的毫秒數後進行再次投遞。這將為消息投遞提供更佳的可靠性,這種情況下,客戶端沒有私有的方法將消息標記為已處理。這一部分也正在開發中。

內存使用和節省載入時間

因為用來建模 Redis 流的設計,內存使用率是非常低的。這取決於它們的欄位、值的數量和長度,對於簡單的消息,每使用 100MB 內存可以有幾百萬條消息。此外,該格式設想為需要極少的序列化:listpack 塊以 radix 樹節點方式存儲,在磁碟上和內存中都以相同方式表示的,因此它們可以很輕鬆地存儲和讀取。例如,Redis 可以在 0.3 秒內從 RDB 文件中讀取 500 萬個條目。這使流的複製和持久存儲非常高效。

我還計劃允許從條目中間進行部分刪除。現在僅實現了一部分,策略是在條目在標記中標識條目為已刪除,並且,當已刪除條目佔全部條目的比例達到指定值時,這個塊將被回收重寫,如果需要,它將被連到相鄰的另一個塊上,以避免碎片化。

關於最終發布時間的結論

Redis 的流特性將包含在年底前(LCTT 譯註:本文原文發佈於 2017 年 10 月)推出的 Redis 4.0 系列的穩定版中。我認為這個通用的數據結構將為 Redis 提供一個巨大的補丁,以用於解決很多現在很難以解決的情況:那意味著你(之前)需要創造性地「濫用」當前提供的數據結構去解決那些問題。一個非常重要的使用場景是時間序列,但是,我覺得對於其它場景來說,通過 TREAD 來流播消息將是非常有趣的,因為對於那些需要更高可靠性的應用程序,可以使用發布/訂閱模式來替換「即用即棄」,還有其它全新的使用場景。現在,如果你想在有問題環境中評估這個新數據結構,可以更新 GitHub 上的 streams 分支開始試用。歡迎向我們報告所有的 bug。:-)

如果你喜歡觀看視頻的方式,這裡有一個現場演示:https://www.youtube.com/watch?v=ELDzy9lCFHQ

via: http://antirez.com/news/114

作者:antirez 譯者:qhwdw 校對:wxy, pityonline

本文由 LCTT 原創編譯,Linux中國 榮譽推出


本文轉載來自 Linux 中國: https://github.com/Linux-CN/archive

對這篇文章感覺如何?

太棒了
0
不錯
0
愛死了
0
不太好
0
感覺很糟
0
雨落清風。心向陽

    You may also like

    Leave a reply

    您的郵箱地址不會被公開。 必填項已用 * 標註

    此站點使用Akismet來減少垃圾評論。了解我們如何處理您的評論數據

    More in:Linux中國