Linux中國

使用 ZeroMQ 消息庫在 C 和 Python 間共享數據

作為軟體工程師,我有多次在要求完成指定任務時感到渾身一冷的經歷。其中有一次,我必須在一些新的硬體基礎設施和雲基礎設施之間寫一個介面,這些硬體需要 C 語言,而雲基礎設施主要是用 Python。

實現的方式之一是 用 C 寫擴展模塊,Python 支持 C 擴展的調用。快速瀏覽文檔後發現,這需要編寫大量的 C 代碼。這樣做的話,在有些情況下效果還不錯,但不是我喜歡的方式。另一種方式就是將兩個任務放在不同的進程中,並使用 ZeroMQ 消息庫 在兩者之間交換消息。

在發現 ZeroMQ 之前,遇到這種類型的情況時,我選擇了編寫擴展的方式。這種方式不算太差,但非常費時費力。如今,為了避免那些問題,我將一個系統細分為獨立的進程,通過 通信套接字 發送消息來交換信息。這樣,不同的編程語言可以共存,每個進程也變簡單了,同時也容易調試。

ZeroMQ 提供了一個更簡單的過程:

  1. 編寫一小段 C 代碼,從硬體讀取數據,然後把發現的東西作為消息發送出去。
  2. 使用 Python 編寫介面,實現新舊基礎設施之間的對接。

Pieter Hintjens 是 ZeroMQ 項目發起者之一,他是個擁有 有趣視角和作品 的非凡人物。

準備

本教程中,需要:

Fedora 系統上的安裝方法:

$ dnf install clang zeromq zeromq-devel python3 python3-zmq

Debian 和 Ubuntu 系統上的安裝方法:

$ apt-get install clang libzmq5 libzmq3-dev python3 python3-zmq

如果有問題,參考對應項目的安裝指南(上面附有鏈接)。

編寫硬體介面庫

因為這裡針對的是個設想的場景,本教程虛構了包含兩個函數的操作庫:

  • fancyhw_init() 用來初始化(設想的)硬體
  • fancyhw_read_val() 用於返回從硬體讀取的數據

將庫的完整代碼保存到文件 libfancyhw.h 中:

#ifndef LIBFANCYHW_H
#define LIBFANCYHW_H

#include <stdlib.h>
#include <stdint.h>

// This is the fictitious hardware interfacing library

void fancyhw_init(unsigned int init_param)
{
    srand(init_param);
}

int16_t fancyhw_read_val(void)
{
    return (int16_t)rand();
}

#endif

這個庫可以模擬你要在不同語言實現的組件間交換的數據,中間有個隨機數發生器。

設計 C 介面

下面從包含管理數據傳輸的庫開始,逐步實現 C 介面。

需要的庫

開始先載入必要的庫(每個庫的作用見代碼注釋):

// For printf()
#include <stdio.h>
// For EXIT_*
#include <stdlib.h>
// For memcpy()
#include <string.h>
// For sleep()
#include <unistd.h>

#include <zmq.h>

#include "libfancyhw.h"

必要的參數

定義 main 函數和後續過程中必要的參數:

int main(void)
{
    const unsigned int INIT_PARAM = 12345;
    const unsigned int REPETITIONS = 10;
    const unsigned int PACKET_SIZE = 16;
    const char *TOPIC = "fancyhw_data";

    ...

初始化

所有的庫都需要初始化。虛構的那個只需要一個參數:

fancyhw_init(INIT_PARAM);

ZeroMQ 庫需要實打實的初始化。首先,定義對象 context,它是用來管理全部的套接字的:

void *context = zmq_ctx_new();

if (!context)
{
    printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %sn", zmq_strerror(errno));

    return EXIT_FAILURE;
}

之後定義用來發送數據的套接字。ZeroMQ 支持若干種套接字,各有其用。使用 publish 套接字(也叫 PUB 套接字),可以複製消息並分發到多個接收端。這使得你可以讓多個接收端接收同一個消息。沒有接收者的消息將被丟棄(即不會入消息隊列)。用法如下:

void *data_socket = zmq_socket(context, ZMQ_PUB);

套接字需要綁定到一個具體的地址,這樣客戶端就知道要連接哪裡了。本例中,使用了 TCP 傳輸層(當然也有 其它選項,但 TCP 是不錯的默認選擇):

const int rb = zmq_bind(data_socket, "tcp://*:5555");

if (rb != 0)
{
    printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %sn", zmq_strerror(errno));

    return EXIT_FAILURE;
}

下一步, 計算一些後續要用到的值。 注意下面代碼中的 TOPIC,因為 PUB 套接字發送的消息需要綁定一個主題。主題用於供接收者過濾消息:

const size_t topic_size = strlen(TOPIC);
const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t);

printf("Topic: %s; topic size: %zu; Envelope size: %zun", TOPIC, topic_size, envelope_size);

發送消息

啟動一個發送消息的循環,循環 REPETITIONS 次:

for (unsigned int i = 0; i < REPETITIONS; i++)
{
    ...

發送消息前,先填充一個長度為 PACKET_SIZE 的緩衝區。本庫提供的是 16 個位的有符號整數。因為 C 語言中 int 類型佔用空間大小與平台相關,不是確定的值,所以要使用指定寬度的 int 變數:

int16_t buffer[PACKET_SIZE];

for (unsigned int j = 0; j < PACKET_SIZE; j++)
{
    buffer[j] = fancyhw_read_val();
}

printf("Read %u data valuesn", PACKET_SIZE);

消息的準備和發送的第一步是創建 ZeroMQ 消息,為消息分配必要的內存空間。空白的消息是用於封裝要發送的數據的:

zmq_msg_t envelope;

const int rmi = zmq_msg_init_size(&envelope, envelope_size);
if (rmi != 0)
{
    printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %sn", zmq_strerror(errno));

    zmq_msg_close(&envelope);

    break;
}

現在內存空間已分配,數據保存在 ZeroMQ 消息 「信封」中。函數 MARKDOWN_HASHbfa36c295a8f8e9bb9c0884a0da91c48MARKDOWNHASH 返回一個指向封裝數據緩存區頂端的指針。第一部分是主題,之後是一個空格,最後是二進位數。主題和二進位數據之間的分隔符採用空格字元。需要遍歷緩存區的話,使用類型轉換和 [指針演算法](https://en.wikipedia.org/wiki/Pointer%28computer_programming%29%23C_and_C++)。(感謝 C 語言,讓事情變得直截了當。)做法如下:

memcpy(zmq_msg_data(&envelope), TOPIC, topic_size);
memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1);
memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t))

通過 data_socket 發送消息:

const size_t rs = zmq_msg_send(&envelope, data_socket, 0);
if (rs != envelope_size)
{
    printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %sn", zmq_strerror(errno));

    zmq_msg_close(&envelope);

    break;
}

使用數據之前要先解除封裝:

zmq_msg_close(&envelope);

printf("Message sent; i: %u, topic: %sn", i, TOPIC);

清理

C 語言不提供 垃圾收集 功能,用完之後記得要自己掃尾。發送消息之後結束程序之前,需要運行掃尾代碼,釋放分配的內存:

const int rc = zmq_close(data_socket);

if (rc != 0)
{
    printf("ERROR: ZeroMQ error occurred during zmq_close(): %sn", zmq_strerror(errno));

    return EXIT_FAILURE;
}

const int rd = zmq_ctx_destroy(context);

if (rd != 0)
{
    printf("Error occurred during zmq_ctx_destroy(): %sn", zmq_strerror(errno));

    return EXIT_FAILURE;
}

return EXIT_SUCCESS;

完整 C 代碼

保存下面完整的介面代碼到本地名為 hw_interface.c 的文件:

// For printf()
#include <stdio.h>
// For EXIT_*
#include <stdlib.h>
// For memcpy()
#include <string.h>
// For sleep()
#include <unistd.h>

#include <zmq.h>

#include "libfancyhw.h"

int main(void)
{
    const unsigned int INIT_PARAM = 12345;
    const unsigned int REPETITIONS = 10;
    const unsigned int PACKET_SIZE = 16;
    const char *TOPIC = "fancyhw_data";

    fancyhw_init(INIT_PARAM);

    void *context = zmq_ctx_new();

    if (!context)
    {
        printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %sn", zmq_strerror(errno));

        return EXIT_FAILURE;
    }

    void *data_socket = zmq_socket(context, ZMQ_PUB);

    const int rb = zmq_bind(data_socket, "tcp://*:5555");

    if (rb != 0)
    {
        printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %sn", zmq_strerror(errno));

        return EXIT_FAILURE;
    }

    const size_t topic_size = strlen(TOPIC);
    const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t);

    printf("Topic: %s; topic size: %zu; Envelope size: %zun", TOPIC, topic_size, envelope_size);

    for (unsigned int i = 0; i < REPETITIONS; i++)
    {
        int16_t buffer[PACKET_SIZE];

        for (unsigned int j = 0; j < PACKET_SIZE; j++)
        {
            buffer[j] = fancyhw_read_val();
        }

        printf("Read %u data valuesn", PACKET_SIZE);

        zmq_msg_t envelope;

        const int rmi = zmq_msg_init_size(&envelope, envelope_size);
        if (rmi != 0)
        {
            printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %sn", zmq_strerror(errno));

            zmq_msg_close(&envelope);

            break;
        }

        memcpy(zmq_msg_data(&envelope), TOPIC, topic_size);

        memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1);

        memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t));

        const size_t rs = zmq_msg_send(&envelope, data_socket, 0);
        if (rs != envelope_size)
        {
            printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %sn", zmq_strerror(errno));

            zmq_msg_close(&envelope);

            break;
        }

        zmq_msg_close(&envelope);

        printf("Message sent; i: %u, topic: %sn", i, TOPIC);

        sleep(1);
    }

    const int rc = zmq_close(data_socket);

    if (rc != 0)
    {
        printf("ERROR: ZeroMQ error occurred during zmq_close(): %sn", zmq_strerror(errno));

        return EXIT_FAILURE;
    }

    const int rd = zmq_ctx_destroy(context);

    if (rd != 0)
    {
        printf("Error occurred during zmq_ctx_destroy(): %sn", zmq_strerror(errno));

        return EXIT_FAILURE;
    }

    return EXIT_SUCCESS;
}

用如下命令編譯:

$ clang -std=c99 -I. hw_interface.c -lzmq -o hw_interface

如果沒有編譯錯誤,你就可以運行這個介面了。貼心的是,ZeroMQ PUB 套接字可以在沒有任何應用發送或接受數據的狀態下運行,這簡化了使用複雜度,因為這樣不限制進程啟動的次序。

運行該介面:

$ ./hw_interface
Topic: fancyhw_data; topic size: 12; Envelope size: 45
Read 16 data values
Message sent; i: 0, topic: fancyhw_data
Read 16 data values
Message sent; i: 1, topic: fancyhw_data
Read 16 data values
...
...

輸出顯示數據已經通過 ZeroMQ 完成發送,現在要做的是讓一個程序去讀數據。

編寫 Python 數據處理器

現在已經準備好從 C 程序向 Python 應用傳送數據了。

需要兩個庫幫助實現數據傳輸。首先是 ZeroMQ 的 Python 封裝:

$ python3 -m pip install zmq

另一個就是 struct 庫,用於解碼二進位數據。這個庫是 Python 標準庫的一部分,所以不需要使用 pip 命令安裝。

Python 程序的第一部分是導入這些庫:

import zmq
import struct

重要參數

使用 ZeroMQ 時,只能向常量 TOPIC 定義相同的接收端發送消息:

topic = "fancyhw_data".encode(&apos;ascii&apos;)

print("Reading messages with topic: {}".format(topic))

初始化

下一步,初始化上下文和套接字。使用 subscribe 套接字(也稱為 SUB 套接字),它是 PUB 套接字的天生伴侶。這個套接字發送時也需要匹配主題。

with zmq.Context() as context:
    socket = context.socket(zmq.SUB)

    socket.connect("tcp://127.0.0.1:5555")
    socket.setsockopt(zmq.SUBSCRIBE, topic)

    i = 0

    ...

接收消息

啟動一個無限循環,等待接收發送到 SUB 套接字的新消息。這個循環會在你按下 Ctrl+C 組合鍵或者內部發生錯誤時終止:

    try:
        while True:

            ... # we will fill this in next

    except KeyboardInterrupt:
        socket.close()
    except Exception as error:
        print("ERROR: {}".format(error))
        socket.close()

這個循環等待 recv() 方法獲取的新消息,然後將接收到的內容從第一個空格字元處分割開,從而得到主題:

binary_topic, data_buffer = socket.recv().split(b&apos; &apos;, 1)

解碼消息

Python 此時尚不知道主題是個字元串,使用標準 ASCII 編解碼器進行解碼:

topic = binary_topic.decode(encoding = &apos;ascii&apos;)

print("Message {:d}:".format(i))
print("ttopic: &apos;{}&apos;".format(topic))

下一步就是使用 struct 庫讀取二進位數據,它可以將二進位數據段轉換為明確的數值。首先,計算數據包中數值的組數。本例中使用的 16 個位的有符號整數對應的是 struct 格式字元 中的 h

packet_size = len(data_buffer) // struct.calcsize("h")

print("tpacket size: {:d}".format(packet_size))

知道數據包中有多少組數據後,就可以通過構建一個包含數據組數和數據類型的字元串,來定義格式了(比如「16h」):

struct_format = "{:d}h".format(packet_size)

將二進位數據串轉換為可直接列印的一系列數字:

data = struct.unpack(struct_format, data_buffer)

print("tdata: {}".format(data))

完整 Python 代碼

下面是 Python 實現的完整的接收端:

#! /usr/bin/env python3

import zmq
import struct

topic = "fancyhw_data".encode(&apos;ascii&apos;)

print("Reading messages with topic: {}".format(topic))

with zmq.Context() as context:
    socket = context.socket(zmq.SUB)

    socket.connect("tcp://127.0.0.1:5555")
    socket.setsockopt(zmq.SUBSCRIBE, topic)

    i = 0

    try:
        while True:
            binary_topic, data_buffer = socket.recv().split(b&apos; &apos;, 1)

            topic = binary_topic.decode(encoding = &apos;ascii&apos;)

            print("Message {:d}:".format(i))
            print("ttopic: &apos;{}&apos;".format(topic))

            packet_size = len(data_buffer) // struct.calcsize("h")

            print("tpacket size: {:d}".format(packet_size))

            struct_format = "{:d}h".format(packet_size)

            data = struct.unpack(struct_format, data_buffer)

            print("tdata: {}".format(data))

            i += 1

    except KeyboardInterrupt:
        socket.close()
    except Exception as error:
        print("ERROR: {}".format(error))
        socket.close()

將上面的內容保存到名為 online_analysis.py 的文件。Python 代碼不需要編譯,你可以直接運行它。

運行輸出如下:

$ ./online_analysis.py
Reading messages with topic: b&apos;fancyhw_data&apos;
Message 0:
        topic: &apos;fancyhw_data&apos;
        packet size: 16
        data: (20946, -23616, 9865, 31416, -15911, -10845, -5332, 25662, 10955, -32501, -18717, -24490, -16511, -28861, 24205, 26568)
Message 1:
        topic: &apos;fancyhw_data&apos;
        packet size: 16
        data: (12505, 31355, 14083, -19654, -9141, 14532, -25591, 31203, 10428, -25564, -732, -7979, 9529, -27982, 29610, 30475)
...
...

小結

本教程介紹了一種新方式,實現從基於 C 的硬體介面收集數據,並分發到基於 Python 的基礎設施的功能。藉此可以獲取數據供後續分析,或者轉送到任意數量的接收端去。它採用了一個消息庫實現數據在發送者和處理者之間的傳送,來取代同樣功能規模龐大的軟體。

本教程還引出了我稱之為「軟體粒度」的概念,換言之,就是將軟體細分為更小的部分。這種做法的優點之一就是,使得同時採用不同的編程語言實現最簡介面作為不同部分之間溝通的組件成為可能。

實踐中,這種設計使得軟體工程師能以更獨立、合作更高效的方式做事。不同的團隊可以專註於數據分析的不同方面,可以選擇自己中意的實現工具。這種做法的另一個優點是實現了零代價的並行,因為所有的進程都可以並行運行。ZeroMQ 消息庫 是個令人讚歎的軟體,使用它可以讓工作大大簡化。

via: https://opensource.com/article/20/3/zeromq-c-python

作者:Cristiano L. Fontana 選題:lujun9972 譯者:silentdawn-zz 校對:wxy

本文由 LCTT 原創編譯,Linux中國 榮譽推出


本文轉載來自 Linux 中國: https://github.com/Linux-CN/archive

對這篇文章感覺如何?

太棒了
0
不錯
0
愛死了
0
不太好
0
感覺很糟
0
雨落清風。心向陽

    You may also like

    Leave a reply

    您的郵箱地址不會被公開。 必填項已用 * 標註

    此站點使用Akismet來減少垃圾評論。了解我們如何處理您的評論數據

    More in:Linux中國