Linux 下的進程間通信:使用管道和消息隊列
本篇是 Linux 下進程間通信(IPC)系列的第二篇文章。第一篇文章 聚焦於通過共享文件和共享內存段這樣的共享存儲來進行 IPC。這篇文件的重點將轉向管道,它是連接需要通信的進程之間的通道。管道擁有一個寫端用於寫入位元組數據,還有一個讀端用於按照先入先出的順序讀入這些位元組數據。而這些位元組數據可能代表任何東西:數字、員工記錄、數字電影等等。
管道有兩種類型,命名管道和無名管道,都可以互動式的在命令行或程序中使用它們;相關的例子在下面展示。這篇文章也將介紹內存隊列,儘管它們有些過時了,但它們不應該受這樣的待遇。
在本系列的第一篇文章中的示例代碼承認了在 IPC 中可能受到競爭條件(不管是基於文件的還是基於內存的)的威脅。自然地我們也會考慮基於管道的 IPC 的安全並發問題,這個也將在本文中提及。針對管道和內存隊列的例子將會使用 POSIX 推薦使用的 API,POSIX 的一個核心目標就是線程安全。
請查看一些 mq_open 函數的 man 頁,這個函數屬於內存隊列的 API。這個 man 頁中有關 特性 的章節帶有一個小表格:
介面 | 特性 | 值 |
---|---|---|
mq_open() |
線程安全 | MT-Safe |
上面的 MT-Safe(MT 指的是 多線程 )意味著 mq_open
函數是線程安全的,進而暗示是進程安全的:一個進程的執行和它的一個線程執行的過程類似,假如競爭條件不會發生在處於相同進程的線程中,那麼這樣的條件也不會發生在處於不同進程的線程中。MT-Safe 特性保證了調用 mq_open
時不會出現競爭條件。一般來說,基於通道的 IPC 是並發安全的,儘管在下面例子中會出現一個有關警告的注意事項。
無名管道
首先讓我們通過一個特意構造的命令行例子來展示無名管道是如何工作的。在所有的現代系統中,符號 |
在命令行中都代表一個無名管道。假設我們的命令行提示符為 %
,接下來考慮下面的命令:
## 寫入方在 | 左邊,讀取方在右邊
% sleep 5 | echo "Hello, world!"
sleep
和 echo
程序以不同的進程執行,無名管道允許它們進行通信。但是上面的例子被特意設計為沒有通信發生。問候語 「Hello, world!」 出現在屏幕中,然後過了 5 秒後,命令行返回,暗示 sleep
和 echo
進程都已經結束了。這期間發生了什麼呢?
在命令行中的豎線 |
的語法中,左邊的進程(sleep
)是寫入方,右邊的進程(echo
)為讀取方。默認情況下,讀取方將會阻塞,直到從通道中能夠讀取到位元組數據,而寫入方在寫完它的位元組數據後,將發送 流已終止 的標誌。(即便寫入方過早終止了,一個流已終止的標誌還是會發給讀取方。)無名管道將保持到寫入方和讀取方都停止的那個時刻。
在上面的例子中,sleep
進程並沒有向通道寫入任何的位元組數據,但在 5 秒後就終止了,這時將向通道發送一個流已終止的標誌。與此同時,echo
進程立即向標準輸出(屏幕)寫入問候語,因為這個進程並不從通道中讀入任何位元組,所以它並沒有等待。一旦 sleep
和 echo
進程都終止了,不會再用作通信的無名管道將會消失然後返回命令行提示符。
下面這個更加實用的示例將使用兩個無名管道。我們假定文件 test.dat
的內容如下:
this
is
the
way
the
world
ends
下面的命令:
% cat test.dat | sort | uniq
會將 cat
( 連接 的縮寫)進程的輸出通過管道傳給 sort
進程以生成排序後的輸出,然後將排序後的輸出通過管道傳給 uniq
進程以消除重複的記錄(在本例中,會將兩次出現的 「the」 縮減為一個):
ends
is
the
this
way
world
下面展示的情景展示的是一個帶有兩個進程的程序通過一個無名管道通信來進行通信。
示例 1. 兩個進程通過一個無名管道來進行通信
#include <sys/wait.h> /* wait */
#include <stdio.h>
#include <stdlib.h> /* exit functions */
#include <unistd.h> /* read, write, pipe, _exit */
#include <string.h>
#define ReadEnd 0
#define WriteEnd 1
void report_and_exit(const char* msg) {
perror(msg);
exit(-1); /** failure **/
}
int main() {
int pipeFDs[2]; /* two file descriptors */
char buf; /* 1-byte buffer */
const char* msg = "Nature's first green is goldn"; /* bytes to write */
if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");
pid_t cpid = fork(); /* fork a child process */
if (cpid < 0) report_and_exit("fork"); /* check for failure */
if (0 == cpid) { /*** child ***/ /* child process */
close(pipeFDs[WriteEnd]); /* child reads, doesn't write */
while (read(pipeFDs[ReadEnd], &buf, 1) > 0) /* read until end of byte stream */
write(STDOUT_FILENO, &buf, sizeof(buf)); /* echo to the standard output */
close(pipeFDs[ReadEnd]); /* close the ReadEnd: all done */
_exit(0); /* exit and notify parent at once */
}
else { /*** parent ***/
close(pipeFDs[ReadEnd]); /* parent writes, doesn't read */
write(pipeFDs[WriteEnd], msg, strlen(msg)); /* write the bytes to the pipe */
close(pipeFDs[WriteEnd]); /* done writing: generate eof */
wait(NULL); /* wait for child to exit */
exit(0); /* exit normally */
}
return 0;
}
上面名為 pipeUN
的程序使用系統函數 fork
來創建一個進程。儘管這個程序只有一個單一的源文件,在它正確執行的情況下將會發生多進程的情況。
下面的內容是對庫函數
fork
如何工作的一個簡要回顧:
fork
函數由父進程調用,在失敗時返回-1
給父進程。在pipeUN
這個例子中,相應的調用是:pid_t cpid = fork(); /* called in parent */
函數調用後的返回值也被保存下來了。在這個例子中,保存在整數類型
pid_t
的變數cpid
中。(每個進程有它自己的進程 ID,這是一個非負的整數,用來標記進程)。復刻一個新的進程可能會因為多種原因而失敗,包括進程表滿了的原因,這個結構由系統維持,以此來追蹤進程狀態。明確地說,殭屍進程假如沒有被處理掉,將可能引起進程表被填滿的錯誤。
假如
fork
調用成功,則它將創建一個新的子進程,向父進程返回一個值,向子進程返回另外的一個值。在調用fork
後父進程和子進程都將執行相同的代碼。(子進程繼承了到此為止父進程中聲明的所有變數的拷貝),特別地,一次成功的fork
調用將返回如下的東西:
- 向子進程返回
0
- 向父進程返回子進程的進程 ID
- 在一次成功的
fork
調用後,一個if
/else
或等價的結構將會被用來隔離針對父進程和子進程的代碼。在這個例子中,相應的聲明為:if (0 == cpid) { /*** child ***/ ... } else { /*** parent ***/ ... }
假如成功地復刻出了一個子進程,pipeUN
程序將像下面這樣去執行。在一個整數的數列里:
int pipeFDs[2]; /* two file descriptors */
來保存兩個文件描述符,一個用來向管道中寫入,另一個從管道中寫入。(數組元素 pipeFDs[0]
是讀端的文件描述符,元素 pipeFDs[1]
是寫端的文件描述符。)在調用 fork
之前,對系統 pipe
函數的成功調用,將立刻使得這個數組獲得兩個文件描述符:
if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");
父進程和子進程現在都有了文件描述符的副本。但分離關注點模式意味著每個進程恰好只需要一個描述符。在這個例子中,父進程負責寫入,而子進程負責讀取,儘管這樣的角色分配可以反過來。在 if
子句中的第一個語句將用於關閉管道的讀端:
close(pipeFDs[WriteEnd]); /* called in child code */
在父進程中的 else
子句將會關閉管道的讀端:
close(pipeFDs[ReadEnd]); /* called in parent code */
然後父進程將向無名管道中寫入某些位元組數據(ASCII 代碼),子進程讀取這些數據,然後向標準輸出中回放它們。
在這個程序中還需要澄清的一點是在父進程代碼中的 wait
函數。一旦被創建後,子進程很大程度上獨立於它的父進程,正如簡短的 pipeUN
程序所展示的那樣。子進程可以執行任意的代碼,而它們可能與父進程完全沒有關係。但是,假如當子進程終止時,系統將會通過一個信號來通知父進程。
要是父進程在子進程之前終止又該如何呢?在這種情形下,除非採取了預防措施,子進程將會變成在進程表中的一個殭屍進程。預防措施有兩大類型:第一種是讓父進程去通知系統,告訴系統它對子進程的終止沒有任何興趣:
signal(SIGCHLD, SIG_IGN); /* in parent: ignore notification */
第二種方法是在子進程終止時,讓父進程執行一個 wait
。這樣就確保了父進程可以獨立於子進程而存在。在 pipeUN
程序中使用了第二種方法,其中父進程的代碼使用的是下面的調用:
wait(NULL); /* called in parent */
這個對 wait
的調用意味著一直等待直到任意一個子進程的終止發生,因此在 pipeUN
程序中,只有一個子進程。(其中的 NULL
參數可以被替換為一個保存有子程序退出狀態的整數變數的地址。)對於更細粒度的控制,還可以使用更靈活的 waitpid
函數,例如特別指定多個子進程中的某一個。
pipeUN
將會採取另一個預防措施。當父進程結束了等待,父進程將會調用常規的 exit
函數去退出。對應的,子進程將會調用 _exit
變種來退出,這類變種將快速跟蹤終止相關的通知。在效果上,子進程會告訴系統立刻去通知父進程它的這個子進程已經終止了。
假如兩個進程向相同的無名管道中寫入內容,位元組數據會交錯嗎?例如,假如進程 P1 向管道寫入內容:
foo bar
同時進程 P2 並發地寫入:
baz baz
到相同的管道,最後的結果似乎是管道中的內容將會是任意錯亂的,例如像這樣:
baz foo baz bar
只要沒有寫入超過 PIPE_BUF
位元組,POSIX 標準就能確保寫入不會交錯。在 Linux 系統中, PIPE_BUF
的大小是 4096 位元組。對於管道我更喜歡只有一個寫入方和一個讀取方,從而繞過這個問題。
命名管道
無名管道沒有備份文件:系統將維持一個內存緩存來將位元組數據從寫方傳給讀方。一旦寫方和讀方終止,這個緩存將會被回收,進而無名管道消失。相反的,命名管道有備份文件和一個不同的 API。
下面讓我們通過另一個命令行示例來了解命名管道的要點。下面是具體的步驟:
- 開啟兩個終端。這兩個終端的工作目錄應該相同。
- 在其中一個終端中,鍵入下面的兩個命令(命令行提示符仍然是
%
,我的注釋以##
打頭。):
% mkfifo tester ## 創建一個備份文件,名為 tester
% cat tester ## 將管道的內容輸出到 stdout
在最開始,沒有任何東西會出現在終端中,因為到現在為止沒有在命名管道中寫入任何東西。
- 在第二個終端中輸入下面的命令:
% cat > tester ## redirect keyboard input to the pipe
hello, world! ## then hit Return key
bye, bye ## ditto
<Control-C> ## terminate session with a Control-C
無論在這個終端中輸入什麼,它都會在另一個終端中顯示出來。一旦鍵入 Ctrl+C
,就會回到正常的命令行提示符,因為管道已經被關閉了。
- 通過移除實現命名管道的文件來進行清理:
% unlink tester
正如 mkfifo
程序的名字所暗示的那樣,命名管道也被叫做 FIFO,因為第一個進入的位元組,就會第一個出,其他的類似。有一個名為 mkfifo
的庫函數,用它可以在程序中創建一個命名管道,它將在下一個示例中被用到,該示例由兩個進程組成:一個向命名管道寫入,而另一個從該管道讀取。
示例 2. fifoWriter 程序
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include <stdio.h>
#define MaxLoops 12000 /* outer loop */
#define ChunkSize 16 /* how many written at a time */
#define IntsPerChunk 4 /* four 4-byte ints per chunk */
#define MaxZs 250 /* max microseconds to sleep */
int main() {
const char* pipeName = "./fifoChannel";
mkfifo(pipeName, 0666); /* read/write for user/group/others */
int fd = open(pipeName, O_CREAT | O_WRONLY); /* open as write-only */
if (fd < 0) return -1; /* can't go on */
int i;
for (i = 0; i < MaxLoops; i++) { /* write MaxWrites times */
int j;
for (j = 0; j < ChunkSize; j++) { /* each time, write ChunkSize bytes */
int k;
int chunk[IntsPerChunk];
for (k = 0; k < IntsPerChunk; k++)
chunk[k] = rand();
write(fd, chunk, sizeof(chunk));
}
usleep((rand() % MaxZs) + 1); /* pause a bit for realism */
}
close(fd); /* close pipe: generates an end-of-stream marker */
unlink(pipeName); /* unlink from the implementing file */
printf("%i ints sent to the pipe.n", MaxLoops * ChunkSize * IntsPerChunk);
return 0;
}
上面的 fifoWriter
程序可以被總結為如下:
- 首先程序創建了一個命名管道用來寫入數據:
mkfifo(pipeName, 0666); /* read/write perms for user/group/others */
int fd = open(pipeName, O_CREAT | O_WRONLY);
其中的 pipeName
是備份文件的名字,傳遞給 mkfifo
作為它的第一個參數。接著命名管道通過我們熟悉的 open
函數調用被打開,而這個函數將會返回一個文件描述符。
- 在實現層面上,
fifoWriter
不會一次性將所有的數據都寫入,而是寫入一個塊,然後休息隨機數目的微秒時間,接著再循環往複。總的來說,有 768000 個 4 位元組整數值被寫入到命名管道中。 - 在關閉命名管道後,
fifoWriter
也將使用unlink
取消對該文件的連接。
close(fd); /* close pipe: generates end-of-stream marker */
unlink(pipeName); /* unlink from the implementing file */
一旦連接到管道的每個進程都執行了 unlink
操作後,系統將回收這些備份文件。在這個例子中,只有兩個這樣的進程 fifoWriter
和 fifoReader
,它們都做了 unlink
操作。
這個兩個程序應該在不同終端的相同工作目錄中執行。但是 fifoWriter
應該在 fifoReader
之前被啟動,因為需要 fifoWriter
去創建管道。然後 fifoReader
才能夠獲取到剛被創建的命名管道。
示例 3. fifoReader 程序
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
unsigned is_prime(unsigned n) { /* not pretty, but efficient */
if (n <= 3) return n > 1;
if (0 == (n % 2) || 0 == (n % 3)) return 0;
unsigned i;
for (i = 5; (i * i) <= n; i += 6)
if (0 == (n % i) || 0 == (n % (i + 2))) return 0;
return 1; /* found a prime! */
}
int main() {
const char* file = "./fifoChannel";
int fd = open(file, O_RDONLY);
if (fd < 0) return -1; /* no point in continuing */
unsigned count = 0, total = 0, primes_count = 0;
while (1) {
int next;
int i;
ssize_t count = read(fd, &next, sizeof(int));
if (0 == count) break; /* end of stream */
else if (count == sizeof(int)) { /* read a 4-byte int value */
total++;
if (is_prime(next)) primes_count++;
}
}
close(fd); /* close pipe from read end */
unlink(file); /* unlink from the underlying file */
printf("Received ints: %u, primes: %un", total, primes_count);
return 0;
}
上面的 fifoReader
的內容可以總結為如下:
- 因為
fifoWriter
已經創建了命名管道,所以fifoReader
只需要利用標準的open
調用來通過備份文件來獲取到管道中的內容:
const char* file = "./fifoChannel";
int fd = open(file, O_RDONLY);
這個文件的是以只讀打開的。
- 然後這個程序進入一個潛在的無限循環,在每次循環時,嘗試讀取 4 位元組的塊。
read
調用:
ssize_t count = read(fd, &next, sizeof(int));
返回 0 來暗示該流的結束。在這種情況下,fifoReader
跳出循環,關閉命名管道,並在終止前 unlink
備份文件。
- 在讀入 4 位元組整數後,
fifoReader
檢查這個數是否為質數。這個操作代表了一個生產級別的讀取器可能在接收到的位元組數據上執行的邏輯操作。在示例運行中,在接收到的 768000 個整數中有 37682 個質數。
重複運行示例, fifoReader
將成功地讀取 fifoWriter
寫入的所有位元組。這不是很讓人驚訝的。這兩個進程在相同的機器上執行,從而可以不用考慮網路相關的問題。命名管道是一個可信且高效的 IPC 機制,因而被廣泛使用。
下面是這兩個程序的輸出,它們在不同的終端中啟動,但處於相同的工作目錄:
% ./fifoWriter
768000 ints sent to the pipe.
###
% ./fifoReader
Received ints: 768000, primes: 37682
消息隊列
管道有著嚴格的先入先出行為:第一個被寫入的位元組將會第一個被讀,第二個寫入的位元組將第二個被讀,以此類推。消息隊列可以做出相同的表現,但它又足夠靈活,可以使得位元組塊可以不以先入先出的次序來接收。
正如它的名字所提示的那樣,消息隊列是一系列的消息,每個消息包含兩部分:
- 荷載,一個位元組序列(在 C 中是 char)
- 類型,以一個正整數值的形式給定,類型用來分類消息,為了更靈活的回收
看一下下面對一個消息隊列的描述,每個消息由一個整數類型標記:
+-+ +-+ +-+ +-+
sender--->|3|--->|2|--->|2|--->|1|--->receiver
+-+ +-+ +-+ +-+
在上面展示的 4 個消息中,標記為 1 的是開頭,即最接近接收端,然後另個標記為 2 的消息,最後接著一個標記為 3 的消息。假如按照嚴格的 FIFO 行為執行,消息將會以 1-2-2-3 這樣的次序被接收。但是消息隊列允許其他收取次序。例如,消息可以被接收方以 3-2-1-2 的次序接收。
mqueue
示例包含兩個程序,sender
將向消息隊列中寫入數據,而 receiver
將從這個隊列中讀取數據。這兩個程序都包含的頭文件 queue.h
如下所示:
示例 4. 頭文件 queue.h
#define ProjectId 123
#define PathName "queue.h" /* any existing, accessible file would do */
#define MsgLen 4
#define MsgCount 6
typedef struct {
long type; /* must be of type long */
char payload[MsgLen + 1]; /* bytes in the message */
} queuedMessage;
上面的頭文件定義了一個名為 queuedMessage
的結構類型,它帶有 payload
(位元組數組)和 type
(整數)這兩個域。該文件也定義了一些符號常數(使用 #define
語句),前兩個常數被用來生成一個 key
,而這個 key
反過來被用來獲取一個消息隊列的 ID。ProjectId
可以是任何正整數值,而 PathName
必須是一個存在的、可訪問的文件,在這個示例中,指的是文件 queue.h
。在 sender
和 receiver
中,它們都有的設定語句為:
key_t key = ftok(PathName, ProjectId); /* generate key */
int qid = msgget(key, 0666 | IPC_CREAT); /* use key to get queue id */
ID qid
在效果上是消息隊列文件描述符的對應物。
示例 5. sender 程序
#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <string.h>
#include "queue.h"
void report_and_exit(const char* msg) {
perror(msg);
exit(-1); /* EXIT_FAILURE */
}
int main() {
key_t key = ftok(PathName, ProjectId);
if (key < 0) report_and_exit("couldn't get key...");
int qid = msgget(key, 0666 | IPC_CREAT);
if (qid < 0) report_and_exit("couldn't get queue id...");
char* payloads[] = {"msg1", "msg2", "msg3", "msg4", "msg5", "msg6"};
int types[] = {1, 1, 2, 2, 3, 3}; /* each must be > 0 */
int i;
for (i = 0; i < MsgCount; i++) {
/* build the message */
queuedMessage msg;
msg.type = types[i];
strcpy(msg.payload, payloads[i]);
/* send the message */
msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT); /* don't block */
printf("%s sent as type %in", msg.payload, (int) msg.type);
}
return 0;
}
上面的 sender
程序將發送出 6 個消息,每兩個為一個類型:前兩個是類型 1,接著的連個是類型 2,最後的兩個為類型 3。發送的語句:
msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT);
被配置為非阻塞的(IPC_NOWAIT
標誌),是因為這裡的消息體量上都很小。唯一的危險在於一個完整的序列將可能導致發送失敗,而這個例子不會。下面的 receiver
程序也將使用 IPC_NOWAIT
標誌來接收消息。
示例 6. receiver 程序
#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include "queue.h"
void report_and_exit(const char* msg) {
perror(msg);
exit(-1); /* EXIT_FAILURE */
}
int main() {
key_t key= ftok(PathName, ProjectId); /* key to identify the queue */
if (key < 0) report_and_exit("key not gotten...");
int qid = msgget(key, 0666 | IPC_CREAT); /* access if created already */
if (qid < 0) report_and_exit("no access to queue...");
int types[] = {3, 1, 2, 1, 3, 2}; /* different than in sender */
int i;
for (i = 0; i < MsgCount; i++) {
queuedMessage msg; /* defined in queue.h */
if (msgrcv(qid, &msg, sizeof(msg), types[i], MSG_NOERROR | IPC_NOWAIT) < 0)
puts("msgrcv trouble...");
printf("%s received as type %in", msg.payload, (int) msg.type);
}
/** remove the queue **/
if (msgctl(qid, IPC_RMID, NULL) < 0) /* NULL = 'no flags' */
report_and_exit("trouble removing queue...");
return 0;
}
這個 receiver
程序不會創建消息隊列,儘管 API 儘管建議那樣。在 receiver
中,對
int qid = msgget(key, 0666 | IPC_CREAT);
的調用可能因為帶有 IPC_CREAT
標誌而具有誤導性,但是這個標誌的真實意義是如果需要就創建,否則直接獲取。sender
程序調用 msgsnd
來發送消息,而 receiver
調用 msgrcv
來接收它們。在這個例子中,sender
以 1-1-2-2-3-3 的次序發送消息,但 receiver
接收它們的次序為 3-1-2-1-3-2,這顯示消息隊列沒有被嚴格的 FIFO 行為所拘泥:
% ./sender
msg1 sent as type 1
msg2 sent as type 1
msg3 sent as type 2
msg4 sent as type 2
msg5 sent as type 3
msg6 sent as type 3
% ./receiver
msg5 received as type 3
msg1 received as type 1
msg3 received as type 2
msg2 received as type 1
msg6 received as type 3
msg4 received as type 2
上面的輸出顯示 sender
和 receiver
可以在同一個終端中啟動。輸出也顯示消息隊列是持久的,即便 sender
進程在完成創建隊列、向隊列寫數據、然後退出的整個過程後,該隊列仍然存在。只有在 receiver
進程顯式地調用 msgctl
來移除該隊列,這個隊列才會消失:
if (msgctl(qid, IPC_RMID, NULL) < 0) /* remove queue */
總結
管道和消息隊列的 API 在根本上來說都是單向的:一個進程寫,然後另一個進程讀。當然還存在雙向命名管道的實現,但我認為這個 IPC 機制在它最為簡單的時候反而是最佳的。正如前面提到的那樣,消息隊列已經不大受歡迎了,儘管沒有找到什麼特別好的原因來解釋這個現象;而隊列仍然是 IPC 工具箱中的一個工具。這個快速的 IPC 工具箱之旅將以第 3 部分(通過套接字和信號來示例 IPC)來終結。
via: https://opensource.com/article/19/4/interprocess-communication-linux-channels
作者:Marty Kalin 選題:lujun9972 譯者:FSSlc 校對:wxy
本文轉載來自 Linux 中國: https://github.com/Linux-CN/archive