Linux中國

構建一個即時消息應用(五):實時消息

本文是該系列的第五篇。

對於實時消息,我們將使用 伺服器發送事件 Server-Sent Events 。這是一個打開的連接,我們可以在其中傳輸數據流。我們會有個端點,用戶會在其中訂閱發送給他的所有消息。

消息戶端

在 HTTP 部分之前,讓我們先編寫一個 映射 map ,讓所有客戶端都監聽消息。 像這樣全局初始化:

type MessageClient struct {
    Messages chan Message
    UserID   string
}

var messageClients sync.Map

已創建的新消息

還記得在 上一篇文章 中,當我們創建這條消息時,我們留下了一個 「TODO」 注釋。在那裡,我們將使用這個函數來調度一個 goroutine。

go messageCreated(message)

把這行代碼插入到我們留注釋的位置。

func messageCreated(message Message) error {
    if err := db.QueryRow(`
 SELECT user_id FROM participants
 WHERE user_id != $1 and conversation_id = $2
 `, message.UserID, message.ConversationID).
    Scan(&message.ReceiverID); err != nil {
        return err
    }

    go broadcastMessage(message)

    return nil
}

func broadcastMessage(message Message) {
    messageClients.Range(func(key, _ interface{}) bool {
        client := key.(*MessageClient)
        if client.UserID == message.ReceiverID {
            client.Messages <- message
        }
        return true
    })
}

該函數查詢接收者 ID(其他參與者 ID),並將消息發送給所有客戶端。

訂閱消息

讓我們轉到 main() 函數並添加以下路由:

router.HandleFunc("GET", "/api/messages", guard(subscribeToMessages))

此端點處理 /api/messages 上的 GET 請求。請求應該是一個 EventSource 連接。它用一個事件流響應,其中的數據是 JSON 格式的。

func subscribeToMessages(w http.ResponseWriter, r *http.Request) {
    if a := r.Header.Get("Accept"); !strings.Contains(a, "text/event-stream") {
        http.Error(w, "This endpoint requires an EventSource connection", http.StatusNotAcceptable)
        return
    }

    f, ok := w.(http.Flusher)
    if !ok {
        respondError(w, errors.New("streaming unsupported"))
        return
    }

    ctx := r.Context()
    authUserID := ctx.Value(keyAuthUserID).(string)

    h := w.Header()
    h.Set("Cache-Control", "no-cache")
    h.Set("Connection", "keep-alive")
    h.Set("Content-Type", "text/event-stream")

    messages := make(chan Message)
    defer close(messages)

    client := &MessageClient{Messages: messages, UserID: authUserID}
    messageClients.Store(client, nil)
    defer messageClients.Delete(client)

    for {
        select {
        case <-ctx.Done():
            return
        case message := <-messages:
            if b, err := json.Marshal(message); err != nil {
                log.Printf("could not marshall message: %vn", err)
                fmt.Fprintf(w, "event: errorndata: %vnn", err)
            } else {
                fmt.Fprintf(w, "data: %snn", b)
            }
            f.Flush()
        }
    }
}

首先,它檢查請求頭是否正確,並檢查伺服器是否支持流式傳輸。我們創建一個消息通道,用它來構建一個客戶端,並將其存儲在客戶端映射中。每當創建新消息時,它都會進入這個通道,因此我們可以通過 for-select 循環從中讀取。

伺服器發送事件 Server-Sent Events 使用以下格式發送數據:

data: some data herenn

我們以 JSON 格式發送:

data: {"foo":"bar"}nn

我們使用 fmt.Fprintf() 以這種格式寫入響應 寫入器 writter ,並在循環的每次迭代中刷新數據。

這個循環會一直運行,直到使用請求上下文關閉連接為止。我們延遲了通道的關閉和客戶端的刪除,因此,當循環結束時,通道將被關閉,客戶端不會收到更多的消息。

注意, 伺服器發送事件 Server-Sent Events (EventSource)的 JavaScript API 不支持設置自定義請求頭?,所以我們不能設置 Authorization: Bearer <token>。這就是為什麼 guard() 中間件也會從 URL 查詢字元串中讀取令牌的原因。

實時消息部分到此結束。我想說的是,這就是後端的全部內容。但是為了編寫前端代碼,我將再增加一個登錄端點:一個僅用於開發的登錄。

via: https://nicolasparada.netlify.com/posts/go-messenger-realtime-messages/

作者:Nicolás Parada 選題:lujun9972 譯者:gxlct008 校對:wxy

本文由 LCTT 原創編譯,Linux中國 榮譽推出


本文轉載來自 Linux 中國: https://github.com/Linux-CN/archive

對這篇文章感覺如何?

太棒了
0
不錯
0
愛死了
0
不太好
0
感覺很糟
0
雨落清風。心向陽

    You may also like

    Leave a reply

    您的郵箱地址不會被公開。 必填項已用 * 標註

    此站點使用Akismet來減少垃圾評論。了解我們如何處理您的評論數據

    More in:Linux中國