全面教程:在 RxJS 中創建流
對大多數開發者來說,與 RxJS 的初次接觸是通過庫的形式,就像 Angular。一些函數會返回 流 ,要使用它們就得把注意力放在操作符上。
有些時候,混用響應式和非響應式代碼似乎很有用。然後大家就開始熱衷流的創造。不論是在編寫非同步代碼或者是數據處理時,流都是一個不錯的方案。
RxJS 提供很多方式來創建流。不管你遇到的是什麼情況,都會有一個完美的創建流的方式。你可能根本用不上它們,但了解它們可以節省你的時間,讓你少碼一些代碼。
我把所有可能的方法,按它們的主要目的,放在四個分類當中:
- 流式化現有數據
- 生成數據
- 使用現有 API 進行交互
- 選擇現有的流,並結合起來
注意:示例用的是 RxJS 6,可能會以前的版本有所不同。已知的區別是你導入函數的方式不同了。
RxJS 6
import {of, from} from 'rxjs';
of(...);
from(...);
RxJS < 6
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/from';
Observable.of(...);
Observable.from(...);
//或
import { of } from 'rxjs/observable/of';
import { from } from 'rxjs/observable/from';
of(...);
from(...);
流的圖示中的標記:
|
表示流結束了X
表示流出現錯誤並被終結...
表示流的走向不定
流式化已有數據
你有一些數據,想把它們放到流中。有三種方式,並且都允許你把調度器當作最後一個參數傳入(你如果想深入了解調度器,可以看看我的 上一篇文章)。這些生成的流都是靜態的。
of
如果只有一個或者一些不同的元素,使用 of
:
of(1,2,3)
.subscribe();
// 結果
// 1 2 3 |
from
如果有一個數組或者 可迭代的對象 ,而且你想要其中的所有元素髮送到流中,使用 from
。你也可以用它來把一個 promise 對象變成可觀測的。
const foo = [1,2,3];
from(foo)
.subscribe();
// 結果
// 1 2 3 |
pairs
流式化一個對象的鍵/值對。用這個對象表示字典時特別有用。
const foo = { a: 1, b: 2};
pairs(foo)
.subscribe();
// 結果
// [a,1] [b,2] |
那麼其他的數據結構呢?
也許你的數據存儲在自定義的結構中,而它又沒有實現 可迭代的對象 介面,又或者說你的結構是遞歸的、樹狀的。也許下面某種選擇適合這些情況:
- 先將數據提取到數組裡
- 使用下一節將會講到的
generate
函數,遍歷所有數據 - 創建一個自定義流(見下一節)
- 創建一個迭代器
稍後會講到選項 2 和 3 ,因此這裡的重點是創建一個迭代器。我們可以對一個 可迭代的對象 調用 from
創建一個流。 可迭代的對象 是一個對象,可以產生一個迭代器(如果你對細節感興趣,參考 這篇 mdn 文章)。
創建一個迭代器的簡單方式是 生成函數 。當你調用一個生成函數時,它返回一個對象,該對象同時遵循 可迭代的對象 介面和 迭代器 介面。
// 自定義的數據結構
class List {
add(element) ...
get(index) ...
get size() ...
...
}
function* listIterator(list) {
for (let i = 0; i<list.size; i++) {
yield list.get(i);
}
}
const myList = new List();
myList.add(1);
myList.add(3);
from(listIterator(myList))
.subscribe(console.log);
// 結果
// 1 3 |
調用 listIterator
函數時,返回值是一個 可迭代的對象 / 迭代器 。函數裡面的代碼在調用 subscribe
前不會執行。
生成數據
你知道要發送哪些數據,但想(或者必須)動態生成它。所有函數的最後一個參數都可以用來接收一個調度器。他們產生靜態的流。
範圍(range
)
從初始值開始,發送一系列數字,直到完成了指定次數的迭代。
range(10, 2) // 從 10 開始,發送兩個值
.subscribe();
// 結果
// 10 11 |
間隔(interval
) / 定時器(timer
)
有點像範圍,但定時器是周期性的發送累加的數字(就是說,不是立即發送)。兩者的區別在於在於定時器允許你為第一個元素設定一個延遲。也可以只產生一個值,只要不指定周期。
interval(1000) // 每 1000ms = 1 秒 發送數據
.subscribe()
// 結果
// 0 1 2 3 4 ...
delay(5000, 1000) // 和上面相同,在開始前先等待 5000ms
delay(5000)
.subscribe(i => console.log("foo");
// 5 秒後列印 foo
大多數定時器將會用來周期性的處理數據:
interval(10000).pipe(
flatMap(i => fetch("https://server/stockTicker")
).subscribe(updateChart)
這段代碼每 10 秒獲取一次數據,更新屏幕。
生成(generate
)
這是個更加複雜的函數,允許你發送一系列任意類型的對象。它有一些重載,這裡你看到的是最有意思的部分:
generate(
0, // 從這個值開始
x => x < 10, // 條件:只要值小於 10,就一直發送
x => x*2 // 迭代:前一個值加倍
).subscribe();
// 結果
// 1 2 4 8 |
你也可以用它來迭代值,如果一個結構沒有實現 可迭代的對象 介面。我們用前面的列表例子來進行演示:
const myList = new List();
myList.add(1);
myList.add(3);
generate(
0, // 從這個值開始
i => i < list.size, // 條件:發送數據,直到遍歷完整個列表
i => ++i, // 迭代:獲取下一個索引
i => list.get(i) // 選擇器:從列表中取值
).subscribe();
// 結果
// 1 3 |
如你所見,我添加了另一個參數:選擇器。它和 map
操作符作用類似,將生成的值轉換為更有用的東西。
空的流
有時候你要傳遞或返回一個不用發送任何數據的流。有三個函數分別用於不同的情況。你可以給這三個函數傳遞調度器。empty
和 throwError
接收一個調度器參數。
empty
創建一個空的流,一個值也不發送。
empty()
.subscribe();
// 結果
// |
never
創建一個永遠不會結束的流,仍然不發送值。
never()
.subscribe();
// 結果
// ...
throwError
創建一個流,流出現錯誤,不發送數據。
throwError('error')
.subscribe();
// 結果
// X
掛鉤已有的 API
不是所有的庫和所有你之前寫的代碼使用或者支持流。幸運的是 RxJS 提供函數用來橋接非響應式和響應式代碼。這一節僅僅討論 RxJS 為橋接代碼提供的模版。
你可能還對這篇出自 Ben Lesh 的 全面的文章 感興趣,這篇文章講了幾乎所有能與 promises 交互操作的方式。
from
我們已經用過它,把它列在這裡是因為,它可以封裝一個含有 observable 對象的 promise 對象。
from(new Promise(resolve => resolve(1)))
.subscribe();
// 結果
// 1 |
fromEvent
fromEvent 為 DOM 元素添加一個事件監聽器,我確定你知道這個。但你可能不知道的是,也可以通過其它類型來添加事件監聽器,例如,一個 jQuery 對象。
const element = $('#fooButton'); // 從 DOM 元素中創建一個 jQuery 對象
from(element, 'click')
.subscribe();
// 結果
// clickEvent ...
fromEventPattern
要理解為什麼有 fromEvent 了還需要 fromEventPattern,我們得先理解 fromEvent 是如何工作的。看這段代碼:
from(document, 'click')
.subscribe();
這告訴 RxJS 我們想要監聽 document 中的點擊事件。在提交過程中,RxJS 發現 document 是一個 EventTarget 類型,因此它可以調用它的 addEventListener
方法。如果我們傳入的是一個 jQuery 對象而非 document,那麼 RxJs 知道它得調用 on 方法。
這個例子用的是 fromEventPattern ,和 fromEvent 的工作基本上一樣:
function addClickHandler(handler) {
document.addEventListener('click', handler);
}
function removeClickHandler(handler) {
document.removeEventListener('click', handler);
}
fromEventPattern(
addClickHandler,
removeClickHandler,
)
.subscribe(console.log);
// 等效於
fromEvent(document, 'click')
RxJS 自動創建實際的監聽器( handler )你的工作是添加或者移除監聽器。fromEventPattern 的目的基本上是告訴 RxJS 如何註冊和移除事件監聽器。
現在想像一下你使用了一個庫,你可以調用一個叫做 registerListener 的方法。我們不能再用 fromEvent,因為它並不知道該怎麼處理這個對象。
const listeners = [];
class Foo {
registerListener(listener) {
listeners.push(listener);
}
emit(value) {
listeners.forEach(listener => listener(value));
}
}
const foo = new Foo();
fromEventPattern(listener => foo.registerListener(listener))
.subscribe();
foo.emit(1);
// 結果
// 1 ...
當我們調用 foo.emit(1)
時,RxJS 中的監聽器將被調用,然後它就能把值發送到流中。
你也可以用它來監聽多個事件類型,或者結合所有可以通過回調進行通訊的 API,例如,WebWorker API:
const myWorker = new Worker('worker.js');
fromEventPattern(
handler => { myWorker.onmessage = handler },
handler => { myWorker.onmessage = undefined }
)
.subscribe();
// 結果
// workerMessage ...
bindCallback
它和 fromEventPattern 相似,但它能用於單個值。就在回調函數被調用時,流就結束了。用法當然也不一樣 —— 你可以用 bindCallBack 封裝函數,然後它就會在調用時魔術般的返回一個流:
function foo(value, callback) {
callback(value);
}
// 沒有流
foo(1, console.log); //prints 1 in the console
// 有流
const reactiveFoo = bindCallback(foo);
// 當我們調用 reactiveFoo 時,它返回一個 observable 對象
reactiveFoo(1)
.subscribe(console.log); // 在控制台列印 1
// 結果
// 1 |
websocket
是的,你完全可以創建一個 websocket 連接然後把它暴露給流:
import { webSocket } from 'rxjs/webSocket';
let socket$ = webSocket('ws://localhost:8081');
// 接收消息
socket$.subscribe(
(msg) => console.log('message received: ' + msg),
(err) => console.log(err),
() => console.log('complete') * );
// 發送消息
socket$.next(JSON.stringify({ op: 'hello' }));
把 websocket 功能添加到你的應用中真的很簡單。websocket 創建一個 subject。這意味著你可以訂閱它,通過調用 next
來獲得消息和發送消息。
ajax
如你所知:類似於 websocket,提供 AJAX 查詢的功能。你可能用了一個帶有 AJAX 功能的庫或者框架。或者你沒有用,那麼我建議使用 fetch(或者必要的話用 polyfill),把返回的 promise 封裝到一個 observable 對象中(參考稍後會講到的 defer
函數)。
定製流
有時候已有的函數用起來並不是足夠靈活。或者你需要對訂閱有更強的控制。
主題(Subject
)
Subject
是一個特殊的對象,它使得你的能夠把數據發送到流中,並且能夠控制數據。Subject
本身就是一個可觀察對象,但如果你想要把流暴露給其它代碼,建議你使用 asObservable
方法。這樣你就不能意外調用原始方法。
const subject = new Subject();
const observable = subject.asObservable();
observable.subscribe();
subject.next(1);
subject.next(2);
subject.complete();
// 結果
// 1 2 |
注意在訂閱前發送的值將會「丟失」:
const subject = new Subject();
const observable = subject.asObservable();
subject.next(1);
observable.subscribe(console.log);
subject.next(2);
subject.complete();
// 結果
// 2
除了常規的 Subject
,RxJS 還提供了三種特殊的版本。
AsyncSubject
在結束後只發送最後的一個值。
const subject = new AsyncSubject();
const observable = subject.asObservable();
observable.subscribe(console.log);
subject.next(1);
subject.next(2);
subject.complete();
// 輸出
// 2
BehaviorSubject
使得你能夠提供一個(默認的)值,如果當前沒有其它值發送的話,這個值會被發送給每個訂閱者。否則訂閱者收到最後一個發送的值。
const subject = new BehaviorSubject(1);
const observable = subject.asObservable();
const subscription1 = observable.subscribe(console.log);
subject.next(2);
subscription1.unsubscribe();
// 輸出
// 1
// 2
const subscription2 = observable.subscribe(console.log);
// 輸出
// 2
ReplaySubject
存儲一定數量、或一定時間或所有的發送過的值。所有新的訂閱者將會獲得所有存儲了的值。
const subject = new ReplaySubject();
const observable = subject.asObservable();
subject.next(1);
observable.subscribe(console.log);
subject.next(2);
subject.complete();
// 輸出
// 1
// 2
你可以在 ReactiveX 文檔(它提供了一些其它的連接) 裡面找到更多關於 Subject
的信息。Ben Lesh 在 On The Subject Of Subjects 上面提供了一些關於 Subject
的理解,Nicholas Jamieson 在 in RxJS: Understanding Subjects 上也提供了一些理解。
可觀察對象
你可以簡單地用 new 操作符創建一個可觀察對象。通過你傳入的函數,你可以控制流,只要有人訂閱了或者它接收到一個可以當成 Subject
使用的觀察者,這個函數就會被調用,比如,調用 next
、complet
和 error
。
讓我們回顧一下列表示例:
const myList = new List();
myList.add(1);
myList.add(3);
new Observable(observer => {
for (let i = 0; i<list.size; i++) {
observer.next(list.get(i));
}
observer.complete();
})
.subscribe();
// 結果
// 1 3 |
這個函數可以返回一個 unsubcribe
函數,當有訂閱者取消訂閱時這個函數就會被調用。你可以用它來清楚或者執行一些收尾操作。
new Observable(observer => {
// 流式化
return () => {
//clean up
};
})
.subscribe();
繼承可觀察對象
在有可用的操作符前,這是一種實現自定義操作符的方式。RxJS 在內部擴展了 可觀察對象 。Subject
就是一個例子,另一個是 publisher
操作符。它返回一個 ConnectableObservable
對象,該對象提供額外的方法 connect
。
實現 Subscribable
介面
有時候你已經用一個對象來保存狀態,並且能夠發送值。如果你實現了 Subscribable
介面,你可以把它轉換成一個可觀察對象。Subscribable
介面中只有一個 subscribe
方法。
interface Subscribable<T> { subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void), error?: (error: any) => void, complete?: () => void): Unsubscribable}
結合和選擇現有的流
知道怎麼創建一個獨立的流還不夠。有時候你有好幾個流但其實只需要一個。有些函數也可作為操作符,所以我不打算在這裡深入展開。推薦看看 Max NgWizard K 所寫的一篇 文章,它還包含一些有趣的動畫。
還有一個建議:你可以通過拖拽元素的方式互動式的使用結合操作,參考 RxMarbles。
ObservableInput 類型
期望接收流的操作符和函數通常不單獨和可觀察對象一起工作。相反,它們實際上期望的參數類型是 ObservableInput,定義如下:
type ObservableInput<T> = SubscribableOrPromise<T> | ArrayLike<T> | Iterable<T>;
這意味著你可以傳遞一個 promises 或者數組卻不需要事先把他們轉換成可觀察對象。
defer
主要的目的是把一個 observable 對象的創建延遲(defer
)到有人想要訂閱的時間。在以下情況,這很有用:
- 創建可觀察對象的開銷較大
- 你想要給每個訂閱者新的可觀察對象
- 你想要在訂閱時候選擇不同的可觀察對象
- 有些代碼必須在訂閱之後執行
最後一點包含了一個並不起眼的用例:Promises(defer
也可以返回一個 promise 對象)。看看這個用到了 fetch API 的例子:
function getUser(id) {
console.log("fetching data");
return fetch(`https://server/user/${id}`);
}
const userPromise = getUser(1);
console.log("I don't want that request now");
// 其它地方
userPromise.then(response => console.log("done");
// 輸出
// fetching data
// I don't want that request now
// done
只要流在你訂閱的時候執行了,promise 就會立即執行。我們調用 getUser
的瞬間,就發送了一個請求,哪怕我們這個時候不想發送請求。當然,我們可以使用 from
來把一個 promise 對象轉換成可觀察對象,但我們傳遞的 promise 對象已經創建或執行了。defer
讓我們能夠等到訂閱才發送這個請求:
const user$ = defer(() => getUser(1));
console.log("I don't want that request now");
// 其它地方
user$.subscribe(response => console.log("done");
// 輸出
// I don't want that request now
// fetching data
// done
iif
iif
包含了一個關於 defer
的特殊用例:在訂閱時選擇兩個流中的一個:
iif(
() => new Date().getHours() < 12,
of("AM"),
of("PM")
)
.subscribe();
// 結果
// AM before noon, PM afterwards
引用該文檔:
onErrorResumeNext
開啟第一個流並且在失敗的時候繼續進行下一個流。錯誤被忽略掉。
const stream1$ = of(1, 2).pipe(
tap(i => { if(i>1) throw 'error'}) //fail after first element
);
const stream2$ = of(3,4);
onErrorResumeNext(stream1$, stream2$)
.subscribe(console.log);
// 結果
// 1 3 4 |
如果你有多個 web 服務,這就很有用了。萬一主伺服器開啟失敗,那麼備份的服務就能自動調用。
forkJoin
它讓流並行運行,當流結束時發送存在數組中的最後的值。由於每個流只有最後一個值被發送,它一般用在只發送一個元素的流的情況,就像 HTTP 請求。你讓請求並行運行,在所有流收到響應時執行某些任務。
function handleResponses([user, account]) {
// 執行某些任務
}
forkJoin(
fetch("https://server/user/1"),
fetch("https://server/account/1")
)
.subscribe(handleResponses);
merge / concat
發送每一個從可觀察對象源中發出的值。
merge
接收一個參數,讓你定義有多少流能被同時訂閱。默認是無限制的。設為 1 就意味著監聽一個源流,在它結束的時候訂閱下一個。由於這是一個常見的場景,RxJS 為你提供了一個顯示的函數:concat
。
merge(
interval(1000).pipe(mapTo("Stream 1"), take(2)),
interval(1200).pipe(mapTo("Stream 2"), take(2)),
timer(0, 1000).pipe(mapTo("Stream 3"), take(2)),
2 //two concurrent streams
)
.subscribe();
// 只訂閱流 1 和流 2
// 輸出
// Stream 1 -> after 1000ms
// Stream 2 -> after 1200ms
// Stream 1 -> after 2000ms
// 流 1 結束後,開始訂閱流 3
// 輸出
// Stream 3 -> after 0 ms
// Stream 2 -> after 400 ms (2400ms from beginning)
// Stream 3 -> after 1000ms
merge(
interval(1000).pipe(mapTo("Stream 1"), take(2)),
interval(1200).pipe(mapTo("Stream 2"), take(2))
1
)
// 等效於
concat(
interval(1000).pipe(mapTo("Stream 1"), take(2)),
interval(1200).pipe(mapTo("Stream 2"), take(2))
)
// 輸出
// Stream 1 -> after 1000ms
// Stream 1 -> after 2000ms
// Stream 2 -> after 3200ms
// Stream 2 -> after 4400ms
zip / combineLatest
merge
和 concat
一個接一個的發送所有從源流中讀到的值,而 zip
和 combineLatest
是把每個流中的一個值結合起來一起發送。zip
結合所有源流中發送的第一個值。如果流的內容相關聯,那麼這就很有用。
zip(
interval(1000),
interval(1200),
)
.subscribe();
// 結果
// [0, 0] [1, 1] [2, 2] ...
combineLatest
與之類似,但結合的是源流中發送的最後一個值。直到所有源流至少發送一個值之後才會觸發事件。這之後每次源流發送一個值,它都會把這個值與其他流發送的最後一個值結合起來。
combineLatest(
interval(1000),
interval(1200),
)
.subscribe();
// 結果
// [0, 0] [1, 0] [1, 1] [2, 1] ...
兩個函數都讓允許傳遞一個選擇器函數,把元素結合成其它對象而不是數組:
zip(
interval(1000),
interval(1200),
(e1, e2) -> e1 + e2
)
.subscribe();
// 結果
// 0 2 4 6 ...
race
選擇第一個發送數據的流。產生的流基本是最快的。
race(
interval(1000),
of("foo")
)
.subscribe();
// 結果
// foo |
由於 of
立即產生一個值,因此它是最快的流,然而這個流就被選中了。
總結
已經有很多創建可觀察對象的方式了。如果你想要創造響應式的 API 或者想用響應式的 API 結合傳統 API,那麼了解這些方法很重要。
我已經向你展示了所有可用的方法,但它們其實還有很多內容可以講。如果你想更加深入地了解,我極力推薦你查閱 文檔 或者閱讀相關文章。
RxViz 是另一種值得了解的有意思的方式。你編寫 RxJS 代碼,產生的流可以用圖形或動畫進行顯示。
via: https://blog.angularindepth.com/the-extensive-guide-to-creating-streams-in-rxjs-aaa02baaff9a
作者:Oliver Flaggl 譯者:BriFuture 校對:wxy
本文轉載來自 Linux 中國: https://github.com/Linux-CN/archive