Linux中國

使用 Kafka 和 MongoDB 進行 Go 非同步處理

在我前面的博客文章 「我的第一個 Go 微服務:使用 MongoDB 和 Docker 多階段構建」 中,我創建了一個 Go 微服務示例,它發布一個 REST 式的 http 端點,並將從 HTTP POST 中接收到的數據保存到 MongoDB 資料庫。

在這個示例中,我將數據的保存和 MongoDB 分離,並創建另一個微服務去處理它。我還添加了 Kafka 為消息層服務,這樣微服務就可以非同步處理它自己關心的東西了。

如果你有時間去看,我將這個博客文章的整個過程錄製到 這個視頻中了 🙂

下面是這個使用了兩個微服務的簡單的非同步處理示例的上層架構圖。

rest-kafka-mongo-microservice-draw-io

微服務 1 —— 是一個 REST 式微服務,它從一個 /POST http 調用中接收數據。接收到請求之後,它從 http 請求中檢索數據,並將它保存到 Kafka。保存之後,它通過 /POST 發送相同的數據去響應調用者。

微服務 2 —— 是一個訂閱了 Kafka 中的一個主題的微服務,微服務 1 的數據保存在該主題。一旦消息被微服務消費之後,它接著保存數據到 MongoDB 中。

在你繼續之前,我們需要能夠去運行這些微服務的幾件東西:

  1. 下載 Kafka —— 我使用的版本是 kafka_2.11-1.1.0
  2. 安裝 librdkafka —— 不幸的是,這個庫應該在目標系統中
  3. 安裝 Kafka Go 客戶端
  4. 運行 MongoDB。你可以去看我的 以前的文章 中關於這一塊的內容,那篇文章中我使用了一個 MongoDB docker 鏡像。

我們開始吧!

首先,啟動 Kafka,在你運行 Kafka 伺服器之前,你需要運行 Zookeeper。下面是示例:

$ cd /<download path>/kafka_2.11-1.1.0
$ bin/zookeeper-server-start.sh config/zookeeper.properties

接著運行 Kafka —— 我使用 9092 埠連接到 Kafka。如果你需要改變埠,只需要在 config/server.properties 中配置即可。如果你像我一樣是個新手,我建議你現在還是使用默認埠。

$ bin/kafka-server-start.sh config/server.properties

Kafka 跑起來之後,我們需要 MongoDB。它很簡單,只需要使用這個 docker-compose.yml 即可。

version: &apos;3&apos;
services:
  mongodb:
    image: mongo
    ports:
      - "27017:27017"
    volumes:
      - "mongodata:/data/db"
    networks:
      - network1

volumes:
   mongodata:

networks:
   network1:

使用 Docker Compose 去運行 MongoDB docker 容器。

docker-compose up

這裡是微服務 1 的相關代碼。我只是修改了我前面的示例去保存到 Kafka 而不是 MongoDB:

rest-to-kafka/rest-kafka-sample.go

func jobsPostHandler(w http.ResponseWriter, r *http.Request) {

    //Retrieve body from http request
    b, err := ioutil.ReadAll(r.Body)
    defer r.Body.Close()
    if err != nil {
        panic(err)
    }

    //Save data into Job struct
    var _job Job
    err = json.Unmarshal(b, &_job)
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    saveJobToKafka(_job)

    //Convert job struct into json
    jsonString, err := json.Marshal(_job)
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    //Set content-type http header
    w.Header().Set("content-type", "application/json")

    //Send back data as response
    w.Write(jsonString)

}

func saveJobToKafka(job Job) {

    fmt.Println("save to kafka")

    jsonString, err := json.Marshal(job)

    jobString := string(jsonString)
    fmt.Print(jobString)

    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
    if err != nil {
        panic(err)
    }

    // Produce messages to topic (asynchronously)
    topic := "jobs-topic1"
    for _, word := range []string{string(jobString)} {
        p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value:          []byte(word),
        }, nil)
    }
}

這裡是微服務 2 的代碼。在這個代碼中最重要的東西是從 Kafka 中消費數據,保存部分我已經在前面的博客文章中討論過了。這裡代碼的重點部分是從 Kafka 中消費數據:

kafka-to-mongo/kafka-mongo-sample.go

func main() {

    //Create MongoDB session
    session := initialiseMongo()
    mongoStore.session = session

    receiveFromKafka()

}

func receiveFromKafka() {

    fmt.Println("Start receiving from Kafka")
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092",
        "group.id":          "group-id-1",
        "auto.offset.reset": "earliest",
    })

    if err != nil {
        panic(err)
    }

    c.SubscribeTopics([]string{"jobs-topic1"}, nil)

    for {
        msg, err := c.ReadMessage(-1)

        if err == nil {
            fmt.Printf("Received from Kafka %s: %sn", msg.TopicPartition, string(msg.Value))
            job := string(msg.Value)
            saveJobToMongo(job)
        } else {
            fmt.Printf("Consumer error: %v (%v)n", err, msg)
            break
        }
    }

    c.Close()

}

func saveJobToMongo(jobString string) {

    fmt.Println("Save to MongoDB")
    col := mongoStore.session.DB(database).C(collection)

    //Save data into Job struct
    var _job Job
    b := []byte(jobString)
    err := json.Unmarshal(b, &_job)
    if err != nil {
        panic(err)
    }

    //Insert job into MongoDB
    errMongo := col.Insert(_job)
    if errMongo != nil {
        panic(errMongo)
    }

    fmt.Printf("Saved to MongoDB : %s", jobString)

}

我們來演示一下,運行微服務 1。確保 Kafka 已經運行了。

$ go run rest-kafka-sample.go

我使用 Postman 向微服務 1 發送數據。

Screenshot-2018-04-29-22.20.33

這裡是日誌,你可以在微服務 1 中看到。當你看到這些的時候,說明已經接收到了來自 Postman 發送的數據,並且已經保存到了 Kafka。

Screenshot-2018-04-29-22.22.00

因為我們尚未運行微服務 2,數據被微服務 1 只保存在了 Kafka。我們來消費它並通過運行的微服務 2 來將它保存到 MongoDB。

$ go run kafka-mongo-sample.go

現在,你將在微服務 2 上看到消費的數據,並將它保存到了 MongoDB。

Screenshot-2018-04-29-22.24.15

檢查一下數據是否保存到了 MongoDB。如果有數據,我們成功了!

Screenshot-2018-04-29-22.26.39

完整的源代碼可以在這裡找到:

https://github.com/donvito/learngo/tree/master/rest-kafka-mongo-microservice

現在是廣告時間:如果你喜歡這篇文章,請在 Twitter @donvito 上關注我。我的 Twitter 上有關於 Docker、Kubernetes、GoLang、Cloud、DevOps、Agile 和 Startups 的內容。歡迎你們在 GitHubLinkedIn 關注我。

開心地玩吧!

via: https://www.melvinvivas.com/developing-microservices-using-kafka-and-mongodb/

作者:Melvin Vivas 譯者:qhwdw 校對:wxy

本文由 LCTT 原創編譯,Linux中國 榮譽推出


本文轉載來自 Linux 中國: https://github.com/Linux-CN/archive

對這篇文章感覺如何?

太棒了
0
不錯
0
愛死了
0
不太好
0
感覺很糟
0
雨落清風。心向陽

    You may also like

    Leave a reply

    您的電子郵箱地址不會被公開。 必填項已用 * 標註

    此站點使用Akismet來減少垃圾評論。了解我們如何處理您的評論數據

    More in:Linux中國