使用 MQTT 在項目中實現數據收發
去年 11 月我們購買了一輛電動汽車,同時也引發了有趣的思考:我們應該什麼時候為電動汽車充電?對於電動汽車充電所用的電,我希望能夠對應最小的二氧化碳排放,歸結為一個特定的問題:對於任意給定時刻,每千瓦時對應的二氧化碳排放量是多少,一天中什麼時間這個值最低?
尋找數據
我住在紐約州,大約 80% 的電力消耗可以自給自足,主要來自天然氣、水壩(大部分來自於 尼亞加拉 大瀑布)、核能發電,少部分來自風力、太陽能和其它化石燃料發電。非盈利性組織 紐約獨立電網運營商 (NYISO)負責整個系統的運作,實現發電機組發電與用電之間的平衡,同時也是紐約路燈系統的監管部門。
儘管沒有為公眾提供公開 API,NYISO 還是盡責提供了不少公開數據供公眾使用。每隔 5 分鐘彙報全州各個發電機組消耗的燃料數據。數據以 CSV 文件的形式發佈於公開的檔案庫中,全天更新。如果你了解不同燃料對發電瓦數的貢獻比例,你可以比較準確的估計任意時刻的二氧化碳排放情況。
在構建收集處理公開數據的工具時,我們應該時刻避免過度使用這些資源。相比將這些數據打包並發送給所有人,我們有更好的方案。我們可以創建一個低開銷的 事件流 ,人們可以訂閱並第一時間得到消息。我們可以使用 MQTT 實現該方案。我的項目(ny-power.org)目標是收錄到 Home Assistant 項目中;後者是一個開源的 家庭自動化 平台,擁有數十萬用戶。如果所有用戶同時訪問 CSV 文件伺服器,估計 NYISO 不得不增加訪問限制。
MQTT 是什麼?
MQTT 是一個 發布訂閱線路協議 ,為小規模設備設計。發布訂閱系統工作原理類似於消息匯流排。你將一條消息發布到一個 主題 上,那麼所有訂閱了該主題的客戶端都可以獲得該消息的一份拷貝。對於消息發送者而言,無需知道哪些人在訂閱消息;你只需將消息發布到一系列主題,並訂閱一些你感興趣的主題。就像參加了一場聚會,你選取並加入感興趣的對話。
MQTT 能夠構建極為高效的應用。客戶端訂閱有限的幾個主題,也只收到它們感興趣的內容。不僅節省了處理時間,還降低了網路帶寬使用。
作為一個開放標準,MQTT 有很多開源的客戶端和服務端實現。對於你能想到的每種編程語言,都有對應的客戶端庫;甚至有嵌入到 Arduino 的庫,可以構建感測器網路。服務端可供選擇的也很多,我的選擇是 Eclipse 項目提供的 Mosquitto 服務端,這是因為它體積小、用 C 編寫,可以承載數以萬計的訂閱者。
為何我喜愛 MQTT
在過去二十年間,我們為軟體應用設計了可靠且準確的模型,用於解決服務遇到的問題。我還有其它郵件嗎?當前的天氣情況如何?我應該此刻購買這種產品嗎?在絕大多數情況下,這種 問答式 的模型工作良好;但對於一個數據爆炸的世界,我們需要其它的模型。MQTT 的發布訂閱模型十分強大,可以將大量數據發送到系統中。客戶可以訂閱數據中的一小部分並在訂閱數據發布的第一時間收到更新。
MQTT 還有一些有趣的特性,其中之一是 遺囑 消息,可以用於區分兩種不同的靜默,一種是沒有主題相關數據推送,另一種是你的數據接收器出現故障。MQTT 還包括 保留消息 ,當客戶端初次連接時會提供相關主題的最後一條消息。這對那些更新緩慢的主題來說很有必要。
我在 Home Assistant 項目開發過程中,發現這種消息匯流排模型對 異構系統 尤為適合。如果你深入 物聯網 領域,你會發現 MQTT 無處不在。
我們的第一個 MQTT 流
NYSO 公布的 CSV 文件中有一個是實時的燃料混合使用情況。每 5 分鐘,NYSO 發布這 5 分鐘內發電使用的燃料類型和相應的發電量(以兆瓦為單位)。
這個 CSV 文件看起來像這樣:
時間戳 | 時區 | 燃料類型 | 兆瓦為單位的發電量 |
---|---|---|---|
05/09/2018 00:05:00 | EDT | 混合燃料 | 1400 |
05/09/2018 00:05:00 | EDT | 天然氣 | 2144 |
05/09/2018 00:05:00 | EDT | 核能 | 4114 |
05/09/2018 00:05:00 | EDT | 其它化石燃料 | 4 |
05/09/2018 00:05:00 | EDT | 其它可再生資源 | 226 |
05/09/2018 00:05:00 | EDT | 風力 | 1 |
05/09/2018 00:05:00 | EDT | 水力 | 3229 |
05/09/2018 00:10:00 | EDT | 混合燃料 | 1307 |
05/09/2018 00:10:00 | EDT | 天然氣 | 2092 |
05/09/2018 00:10:00 | EDT | 核能 | 4115 |
05/09/2018 00:10:00 | EDT | 其它化石燃料 | 4 |
05/09/2018 00:10:00 | EDT | 其它可再生資源 | 224 |
05/09/2018 00:10:00 | EDT | 風力 | 40 |
05/09/2018 00:10:00 | EDT | 水力 | 3166 |
表中唯一令人不解就是燃料類別中的混合燃料。紐約的大多數天然氣工廠也通過燃燒其它類型的化石燃料發電。在冬季寒潮到來之際,家庭供暖的優先順序高於發電;但這種情況出現的次數不多,(在我們計算中)可以將混合燃料類型看作天然氣類型。
CSV 文件全天更新。我編寫了一個簡單的數據泵,每隔 1 分鐘檢查是否有數據更新,並將新條目發布到 MQTT 伺服器的一系列主題上,主題名稱基本與 CSV 文件有一定的對應關係。數據內容被轉換為 JSON 對象,方便各種編程語言處理。
ny-power/upstream/fuel-mix/Hydro {"units": "MW", "value": 3229, "ts": "05/09/2018 00:05:00"}
ny-power/upstream/fuel-mix/Dual Fuel {"units": "MW", "value": 1400, "ts": "05/09/2018 00:05:00"}
ny-power/upstream/fuel-mix/Natural Gas {"units": "MW", "value": 2144, "ts": "05/09/2018 00:05:00"}
ny-power/upstream/fuel-mix/Other Fossil Fuels {"units": "MW", "value": 4, "ts": "05/09/2018 00:05:00"}
ny-power/upstream/fuel-mix/Wind {"units": "MW", "value": 41, "ts": "05/09/2018 00:05:00"}
ny-power/upstream/fuel-mix/Other Renewables {"units": "MW", "value": 226, "ts": "05/09/2018 00:05:00"}
ny-power/upstream/fuel-mix/Nuclear {"units": "MW", "value": 4114, "ts": "05/09/2018 00:05:00"}
這種直接的轉換是種不錯的嘗試,可將公開數據轉換為公開事件。我們後續會繼續將數據轉換為二氧化碳排放強度,但這些原始數據還可被其它應用使用,用於其它計算用途。
MQTT 主題
主題和 主題結構 是 MQTT 的一個主要特色。與其它標準的企業級消息匯流排不同,MQTT 的主題無需事先註冊。發送者可以憑空創建主題,唯一的限制是主題的長度,不超過 220 字元。其中 /
字元有特殊含義,用於創建主題的層次結構。我們即將看到,你可以訂閱這些層次中的一些分片。
基於開箱即用的 Mosquitto,任何一個客戶端都可以向任何主題發布消息。在原型設計過程中,這種方式十分便利;但一旦部署到生產環境,你需要增加 訪問控制列表 (ACL)只允許授權的應用發布消息。例如,任何人都能以只讀的方式訪問我的應用的主題層級,但只有那些具有特定 憑證 的客戶端可以發布內容。
主題中不包含 自動樣式 ,也沒有方法查找客戶端可以發布的全部主題。因此,對於那些從 MQTT 匯流排消費數據的應用,你需要讓其直接使用已知的主題和消息格式樣式。
那麼應該如何設計主題呢?最佳實踐包括使用應用相關的根名稱,例如在我的應用中使用 ny-power
。接著,為提高訂閱效率,構建足夠深的層次結構。upstream
層次結構包含了直接從數據源獲取的、不經處理的原始數據,而 fuel-mix
層次結構包含特定類型的數據;我們後續還可以增加其它的層次結構。
訂閱主題
在 MQTT 中,訂閱僅僅是簡單的字元串匹配。為提高處理效率,只允許如下兩種通配符:
#
以遞歸方式匹配,直到字元串結束+
匹配下一個/
之前的內容
為便於理解,下面給出幾個例子:
ny-power/# - 匹配 ny-power 應用發布的全部主題
ny-power/upstream/# - 匹配全部原始數據的主題
ny-power/upstream/fuel-mix/+ - 匹配全部燃料類型的主題
ny-power/+/+/Hydro - 匹配全部兩次層級之後為 Hydro 類型的主題(即使不位於 upstream 層次結構下)
類似 ny-power/#
的大範圍訂閱適用於 低數據量 的應用,應用從網路獲取全部數據並處理。但對 高數據量 應用而言則是一個災難,由於絕大多數消息並不會被使用,大部分的網路帶寬被白白浪費了。
在大數據量情況下,為確保性能,應用需要使用恰當的主題篩選(如 ny-power/+/+/Hydro
)盡量準確獲取業務所需的數據。
增加我們自己的數據層次
接下來,應用中的一切都依賴於已有的 MQTT 流並構建新流。第一個額外的數據層用於計算髮電對應的二氧化碳排放。
利用 美國能源情報署 給出的 2016 年紐約各類燃料發電及排放情況,我們可以給出各類燃料的平均排放率,單位為克/兆瓦時。
上述結果被封裝到一個專用的微服務中。該微服務訂閱 ny-power/upstream/fuel-mix/+
,即數據泵中燃料組成情況的原始數據,接著完成計算並將結果(單位為克/千瓦時)發布到新的主題層次結構上:
ny-power/computed/co2 {"units": "g / kWh", "value": 152.9486, "ts": "05/09/2018 00:05:00"}
接著,另一個服務會訂閱該主題層次結構並將數據打包到 InfluxDB 進程中;同時,發布 24 小時內的時間序列數據到 ny-power/archive/co2/24h
主題,這樣可以大大簡化當前變化數據的繪製。
這種層次結構的主題模型效果不錯,可以將上述程序之間的邏輯解耦合。在複雜系統中,各個組件可能使用不同的編程語言,但這並不重要,因為交換格式都是 MQTT 消息,即主題和 JSON 格式的消息內容。
從終端消費數據
為了更好的了解 MQTT 完成了什麼工作,將其綁定到一個消息匯流排並查看消息流是個不錯的方法。mosquitto-clients
包中的 mosquitto_sub
可以讓我們輕鬆實現該目標。
安裝程序後,你需要提供伺服器名稱以及你要訂閱的主題。如果有需要,使用參數 -v
可以讓你看到有新消息發布的那些主題;否則,你只能看到主題內的消息數據。
mosquitto_sub -h mqtt.ny-power.org -t ny-power/# -v
只要我編寫或調試 MQTT 應用,我總會在一個終端中運行 mosquitto_sub
。
從網頁直接訪問 MQTT
到目前為止,我們已經有提供公開事件流的應用,可以用微服務或命令行工具訪問該應用。但考慮到互聯網仍佔據主導地位,因此讓用戶可以從瀏覽器直接獲取事件流是很重要。
MQTT 的設計者已經考慮到了這一點。協議標準支持三種不同的傳輸協議:TCP、UDP 和 WebSockets。主流瀏覽器都支持 WebSockets,可以維持持久連接,用於實時應用。
Eclipse 項目提供了 MQTT 的一個 JavaScript 實現,叫做 Paho,可包含在你的應用中。工作模式為與伺服器建立連接、建立一些訂閱,然後根據接收到的消息進行響應。
// ny-power web console application
var client = new Paho.MQTT.Client(mqttHost, Number("80"), "client-" + Math.random());
// set callback handlers
client.onMessageArrived = onMessageArrived;
// connect the client
client.reconnect = true;
client.connect({onSuccess: onConnect});
// called when the client connects
function onConnect() {
// Once a connection has been made, make a subscription and send a message.
console.log("onConnect");
client.subscribe("ny-power/computed/co2");
client.subscribe("ny-power/archive/co2/24h");
client.subscribe("ny-power/upstream/fuel-mix/#");
}
// called when a message arrives
function onMessageArrived(message) {
console.log("onMessageArrived:"+message.destinationName + message.payloadString);
if (message.destinationName == "ny-power/computed/co2") {
var data = JSON.parse(message.payloadString);
$("#co2-per-kwh").html(Math.round(data.value));
$("#co2-units").html(data.units);
$("#co2-updated").html(data.ts);
}
if (message.destinationName.startsWith("ny-power/upstream/fuel-mix")) {
fuel_mix_graph(message);
}
if (message.destinationName == "ny-power/archive/co2/24h") {
var data = JSON.parse(message.payloadString);
var plot = [
{
x: data.ts,
y: data.values,
type: 'scatter'
}
];
var layout = {
yaxis: {
title: "g CO2 / kWh",
}
};
Plotly.newPlot('co2_graph', plot, layout);
}
上述應用訂閱了不少主題,因為我們將要呈現若干種不同類型的數據;其中 ny-power/computed/co2
主題為我們提供當前二氧化碳排放的參考值。一旦收到該主題的新消息,網站上的相應內容會被相應替換。
![NYISO 二氧化碳排放圖](/data/attachment/album/201808/15/220846oejtdbotat1ar19n.png "NY ISO Grid CO2 Intensity")
ny-power.org 網站提供的 NYISO 二氧化碳排放圖。
ny-power/archive/co2/24h
主題提供了時間序列數據,用於為 Plotly 線表提供數據。ny-power/upstream/fuel-mix
主題提供當前燃料組成情況,為漂亮的柱狀圖提供數據。
![NYISO 燃料組成情況](/data/attachment/album/201808/15/220850xgbq4bgb5ogqkg7k.png "Fuel mix on NYISO grid")
ny-power.org 網站提供的燃料組成情況。
這是一個動態網站,數據不從伺服器拉取,而是結合 MQTT 消息匯流排,監聽對外開放的 WebSocket。就像數據泵和打包器程序那樣,網站頁面也是一個發布訂閱客戶端,只不過是在你的瀏覽器中執行,而不是在公有雲的微服務上。
你可以在 http://ny-power.org 站點點看到動態變更,包括圖像和可以看到消息到達的實時 MQTT 終端。
繼續深入
ny-power.org 應用的完整內容開源在 GitHub 中。你也可以查閱 架構簡介,學習如何使用 Helm 部署一系列 Kubernetes 微服務構建應用。另一個有趣的 MQTT 示例使用 MQTT 和 OpenWhisk 進行實時文本消息翻譯, 代碼模式 參考鏈接。
MQTT 被廣泛應用於物聯網領域,更多關於 MQTT 用途的例子可以在 Home Assistant 項目中找到。
如果你希望深入了解協議內容,可以從 mqtt.org 獲得該公開標準的全部細節。
想了解更多,可以參加 Sean Dague 在 OSCON 上的演講,主題為 將 MQTT 加入到你的工具箱,會議將於 7 月 16-19 日在奧爾良州波特蘭舉辦。
via: https://opensource.com/article/18/6/mqtt
作者:Sean Dague 選題:lujun9972 譯者:pinewall 校對:wxy
本文轉載來自 Linux 中國: https://github.com/Linux-CN/archive