Linux中國

全面教程:在 RxJS 中創建流

對大多數開發者來說,與 RxJS 的初次接觸是通過庫的形式,就像 Angular。一些函數會返回 stream ,要使用它們就得把注意力放在操作符上。

有些時候,混用響應式和非響應式代碼似乎很有用。然後大家就開始熱衷流的創造。不論是在編寫非同步代碼或者是數據處理時,流都是一個不錯的方案。

RxJS 提供很多方式來創建流。不管你遇到的是什麼情況,都會有一個完美的創建流的方式。你可能根本用不上它們,但了解它們可以節省你的時間,讓你少碼一些代碼。

我把所有可能的方法,按它們的主要目的,放在四個分類當中:

  • 流式化現有數據
  • 生成數據
  • 使用現有 API 進行交互
  • 選擇現有的流,並結合起來

注意:示例用的是 RxJS 6,可能會以前的版本有所不同。已知的區別是你導入函數的方式不同了。

RxJS 6

import {of, from} from 'rxjs';

of(...);
from(...);

RxJS < 6

import { Observable } from &apos;rxjs/Observable&apos;;
import &apos;rxjs/add/observable/of&apos;;
import &apos;rxjs/add/observable/from&apos;;

Observable.of(...);
Observable.from(...);

//或

import { of } from &apos;rxjs/observable/of&apos;;
import { from } from &apos;rxjs/observable/from&apos;;

of(...);
from(...);

流的圖示中的標記:

  • | 表示流結束了
  • X 表示流出現錯誤並被終結
  • ... 表示流的走向不定

流式化已有數據

你有一些數據,想把它們放到流中。有三種方式,並且都允許你把調度器當作最後一個參數傳入(你如果想深入了解調度器,可以看看我的 上一篇文章)。這些生成的流都是靜態的。

of

如果只有一個或者一些不同的元素,使用 of

of(1,2,3)
  .subscribe();
// 結果
// 1 2 3 |

from

如果有一個數組或者 可迭代的對象 ,而且你想要其中的所有元素髮送到流中,使用 from。你也可以用它來把一個 promise 對象變成可觀測的。

const foo = [1,2,3];

from(foo)
  .subscribe();
// 結果
// 1 2 3 |

pairs

流式化一個對象的鍵/值對。用這個對象表示字典時特別有用。

const foo = { a: 1, b: 2};

pairs(foo)
  .subscribe();
// 結果
// [a,1] [b,2] |

那麼其他的數據結構呢?

也許你的數據存儲在自定義的結構中,而它又沒有實現 可迭代的對象 介面,又或者說你的結構是遞歸的、樹狀的。也許下面某種選擇適合這些情況:

  1. 先將數據提取到數組裡
  2. 使用下一節將會講到的 generate 函數,遍歷所有數據
  3. 創建一個自定義流(見下一節)
  4. 創建一個迭代器

稍後會講到選項 2 和 3 ,因此這裡的重點是創建一個迭代器。我們可以對一個 可迭代的對象 調用 from 創建一個流。 可迭代的對象 是一個對象,可以產生一個迭代器(如果你對細節感興趣,參考 這篇 mdn 文章)。

創建一個迭代器的簡單方式是 生成函數 generator function 。當你調用一個生成函數時,它返回一個對象,該對象同時遵循 可迭代的對象 介面和 迭代器 介面。

// 自定義的數據結構
class List {
  add(element) ...
  get(index) ...
  get size() ...
  ...
}

function* listIterator(list) {
  for (let i = 0; i<list.size; i++) {
    yield list.get(i);
  }
}

const myList = new List();
myList.add(1);
myList.add(3);

from(listIterator(myList))
  .subscribe(console.log);
// 結果
// 1 3 |    

調用 listIterator 函數時,返回值是一個 可迭代的對象 / 迭代器 。函數裡面的代碼在調用 subscribe 前不會執行。

生成數據

你知道要發送哪些數據,但想(或者必須)動態生成它。所有函數的最後一個參數都可以用來接收一個調度器。他們產生靜態的流。

範圍(range

從初始值開始,發送一系列數字,直到完成了指定次數的迭代。

range(10, 2)  // 從 10 開始,發送兩個值
  .subscribe();
// 結果
// 10 11 |

間隔(interval) / 定時器(timer

有點像範圍,但定時器是周期性的發送累加的數字(就是說,不是立即發送)。兩者的區別在於在於定時器允許你為第一個元素設定一個延遲。也可以只產生一個值,只要不指定周期。

interval(1000) // 每 1000ms = 1 秒 發送數據
  .subscribe()
// 結果
// 0  1  2  3  4 ...
delay(5000, 1000) // 和上面相同,在開始前先等待 5000ms

delay(5000)
.subscribe(i => console.log("foo");
// 5 秒後列印 foo

大多數定時器將會用來周期性的處理數據:

interval(10000).pipe(
  flatMap(i => fetch("https://server/stockTicker")
).subscribe(updateChart)

這段代碼每 10 秒獲取一次數據,更新屏幕。

生成(generate

這是個更加複雜的函數,允許你發送一系列任意類型的對象。它有一些重載,這裡你看到的是最有意思的部分:

generate(
  0,           // 從這個值開始
  x => x < 10, // 條件:只要值小於 10,就一直發送
  x => x*2     // 迭代:前一個值加倍
).subscribe();
// 結果
// 1 2 4 8 |

你也可以用它來迭代值,如果一個結構沒有實現 可迭代的對象 介面。我們用前面的列表例子來進行演示:

const myList = new List();
myList.add(1);
myList.add(3);

generate(
  0,                  // 從這個值開始
  i => i < list.size, // 條件:發送數據,直到遍歷完整個列表
  i => ++i,           // 迭代:獲取下一個索引
  i => list.get(i)    // 選擇器:從列表中取值
).subscribe();
// 結果
// 1 3 |

如你所見,我添加了另一個參數:選擇器。它和 map 操作符作用類似,將生成的值轉換為更有用的東西。

空的流

有時候你要傳遞或返回一個不用發送任何數據的流。有三個函數分別用於不同的情況。你可以給這三個函數傳遞調度器。emptythrowError 接收一個調度器參數。

empty

創建一個空的流,一個值也不發送。

empty()
  .subscribe();
// 結果
// |

never

創建一個永遠不會結束的流,仍然不發送值。

never()
  .subscribe();
// 結果
// ...

throwError

創建一個流,流出現錯誤,不發送數據。

throwError(&apos;error&apos;)
  .subscribe();
// 結果
// X

掛鉤已有的 API

不是所有的庫和所有你之前寫的代碼使用或者支持流。幸運的是 RxJS 提供函數用來橋接非響應式和響應式代碼。這一節僅僅討論 RxJS 為橋接代碼提供的模版。

你可能還對這篇出自 Ben Lesh全面的文章 感興趣,這篇文章講了幾乎所有能與 promises 交互操作的方式。

from

我們已經用過它,把它列在這裡是因為,它可以封裝一個含有 observable 對象的 promise 對象。

from(new Promise(resolve => resolve(1)))
  .subscribe();
// 結果
// 1 |

fromEvent

fromEvent 為 DOM 元素添加一個事件監聽器,我確定你知道這個。但你可能不知道的是,也可以通過其它類型來添加事件監聽器,例如,一個 jQuery 對象。

const element = $(&apos;#fooButton&apos;); // 從 DOM 元素中創建一個 jQuery 對象

from(element, &apos;click&apos;)
  .subscribe();
// 結果
// clickEvent ...

fromEventPattern

要理解為什麼有 fromEvent 了還需要 fromEventPattern,我們得先理解 fromEvent 是如何工作的。看這段代碼:

from(document, &apos;click&apos;)
  .subscribe();

這告訴 RxJS 我們想要監聽 document 中的點擊事件。在提交過程中,RxJS 發現 document 是一個 EventTarget 類型,因此它可以調用它的 addEventListener 方法。如果我們傳入的是一個 jQuery 對象而非 document,那麼 RxJs 知道它得調用 on 方法。

這個例子用的是 fromEventPattern ,和 fromEvent 的工作基本上一樣:

function addClickHandler(handler) {
  document.addEventListener(&apos;click&apos;, handler);
}

function removeClickHandler(handler) {
  document.removeEventListener(&apos;click&apos;, handler);
}

fromEventPattern(
  addClickHandler,
  removeClickHandler,
)
.subscribe(console.log);

// 等效於
fromEvent(document, &apos;click&apos;)

RxJS 自動創建實際的監聽器( handler )你的工作是添加或者移除監聽器。fromEventPattern 的目的基本上是告訴 RxJS 如何註冊和移除事件監聽器。

現在想像一下你使用了一個庫,你可以調用一個叫做 registerListener 的方法。我們不能再用 fromEvent,因為它並不知道該怎麼處理這個對象。

const listeners = [];

class Foo {
  registerListener(listener) {
    listeners.push(listener);
  }

  emit(value) {
    listeners.forEach(listener => listener(value));
  }
}

const foo = new Foo();

fromEventPattern(listener => foo.registerListener(listener))
  .subscribe();

foo.emit(1);
// 結果
// 1 ...

當我們調用 foo.emit(1) 時,RxJS 中的監聽器將被調用,然後它就能把值發送到流中。

你也可以用它來監聽多個事件類型,或者結合所有可以通過回調進行通訊的 API,例如,WebWorker API:

const myWorker = new Worker(&apos;worker.js&apos;);

fromEventPattern(
  handler => { myWorker.onmessage = handler },
  handler => { myWorker.onmessage = undefined }
)
.subscribe();
// 結果
// workerMessage ...

bindCallback

它和 fromEventPattern 相似,但它能用於單個值。就在回調函數被調用時,流就結束了。用法當然也不一樣 —— 你可以用 bindCallBack 封裝函數,然後它就會在調用時魔術般的返回一個流:

function foo(value, callback) {
  callback(value);
}

// 沒有流
foo(1, console.log); //prints 1 in the console

// 有流
const reactiveFoo = bindCallback(foo); 
// 當我們調用 reactiveFoo 時,它返回一個 observable 對象

reactiveFoo(1)
  .subscribe(console.log); // 在控制台列印 1
// 結果
// 1 |

websocket

是的,你完全可以創建一個 websocket 連接然後把它暴露給流:

import { webSocket } from &apos;rxjs/webSocket&apos;; 

let socket$ = webSocket(&apos;ws://localhost:8081&apos;);

// 接收消息
socket$.subscribe(
  (msg) => console.log(&apos;message received: &apos; + msg),
  (err) => console.log(err),
  () => console.log(&apos;complete&apos;) * );

// 發送消息
socket$.next(JSON.stringify({ op: &apos;hello&apos; }));

把 websocket 功能添加到你的應用中真的很簡單。websocket 創建一個 subject。這意味著你可以訂閱它,通過調用 next 來獲得消息和發送消息。

ajax

如你所知:類似於 websocket,提供 AJAX 查詢的功能。你可能用了一個帶有 AJAX 功能的庫或者框架。或者你沒有用,那麼我建議使用 fetch(或者必要的話用 polyfill),把返回的 promise 封裝到一個 observable 對象中(參考稍後會講到的 defer 函數)。

定製流

有時候已有的函數用起來並不是足夠靈活。或者你需要對訂閱有更強的控制。

主題(Subject

Subject 是一個特殊的對象,它使得你的能夠把數據發送到流中,並且能夠控制數據。Subject 本身就是一個可觀察對象,但如果你想要把流暴露給其它代碼,建議你使用 asObservable 方法。這樣你就不能意外調用原始方法。

const subject = new Subject();
const observable = subject.asObservable();

observable.subscribe();

subject.next(1);
subject.next(2);
subject.complete();
// 結果
// 1 2 |

注意在訂閱前發送的值將會「丟失」:

const subject = new Subject();
const observable = subject.asObservable();

subject.next(1);

observable.subscribe(console.log);

subject.next(2);
subject.complete();
// 結果
// 2

除了常規的 Subject,RxJS 還提供了三種特殊的版本。

AsyncSubject 在結束後只發送最後的一個值。

const subject = new AsyncSubject();
const observable = subject.asObservable();

observable.subscribe(console.log);

subject.next(1);
subject.next(2);
subject.complete();
// 輸出
// 2

BehaviorSubject 使得你能夠提供一個(默認的)值,如果當前沒有其它值發送的話,這個值會被發送給每個訂閱者。否則訂閱者收到最後一個發送的值。

const subject = new BehaviorSubject(1);
const observable = subject.asObservable();

const subscription1 = observable.subscribe(console.log);

subject.next(2);
subscription1.unsubscribe();
// 輸出
// 1
// 2
const subscription2 = observable.subscribe(console.log);

// 輸出
// 2

ReplaySubject 存儲一定數量、或一定時間或所有的發送過的值。所有新的訂閱者將會獲得所有存儲了的值。

const subject = new ReplaySubject();
const observable = subject.asObservable();

subject.next(1);

observable.subscribe(console.log);

subject.next(2);
subject.complete();
// 輸出
// 1
// 2

你可以在 ReactiveX 文檔(它提供了一些其它的連接) 裡面找到更多關於 Subject 的信息。Ben LeshOn The Subject Of Subjects 上面提供了一些關於 Subject 的理解,Nicholas Jamiesonin RxJS: Understanding Subjects 上也提供了一些理解。

可觀察對象

你可以簡單地用 new 操作符創建一個可觀察對象。通過你傳入的函數,你可以控制流,只要有人訂閱了或者它接收到一個可以當成 Subject 使用的觀察者,這個函數就會被調用,比如,調用 nextcompleterror

讓我們回顧一下列表示例:

const myList = new List();
myList.add(1);
myList.add(3);

new Observable(observer => {
  for (let i = 0; i<list.size; i++) {
    observer.next(list.get(i));
  }

  observer.complete();
})
.subscribe();
// 結果
// 1 3 |

這個函數可以返回一個 unsubcribe 函數,當有訂閱者取消訂閱時這個函數就會被調用。你可以用它來清楚或者執行一些收尾操作。

new Observable(observer => {
  // 流式化

  return () => {
                 //clean up
               };
})
.subscribe();

繼承可觀察對象

在有可用的操作符前,這是一種實現自定義操作符的方式。RxJS 在內部擴展了 可觀察對象Subject 就是一個例子,另一個是 publisher 操作符。它返回一個 ConnectableObservable 對象,該對象提供額外的方法 connect

實現 Subscribable 介面

有時候你已經用一個對象來保存狀態,並且能夠發送值。如果你實現了 Subscribable 介面,你可以把它轉換成一個可觀察對象。Subscribable 介面中只有一個 subscribe 方法。

interface Subscribable<T> {  subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void), error?: (error: any) => void, complete?: () => void): Unsubscribable}

結合和選擇現有的流

知道怎麼創建一個獨立的流還不夠。有時候你有好幾個流但其實只需要一個。有些函數也可作為操作符,所以我不打算在這裡深入展開。推薦看看 Max NgWizard K 所寫的一篇 文章,它還包含一些有趣的動畫。

還有一個建議:你可以通過拖拽元素的方式互動式的使用結合操作,參考 RxMarbles

ObservableInput 類型

期望接收流的操作符和函數通常不單獨和可觀察對象一起工作。相反,它們實際上期望的參數類型是 ObservableInput,定義如下:

type ObservableInput<T> = SubscribableOrPromise<T> | ArrayLike<T> | Iterable<T>;

這意味著你可以傳遞一個 promises 或者數組卻不需要事先把他們轉換成可觀察對象。

defer

主要的目的是把一個 observable 對象的創建延遲(defer)到有人想要訂閱的時間。在以下情況,這很有用:

  • 創建可觀察對象的開銷較大
  • 你想要給每個訂閱者新的可觀察對象
  • 你想要在訂閱時候選擇不同的可觀察對象
  • 有些代碼必須在訂閱之後執行

最後一點包含了一個並不起眼的用例:Promises(defer 也可以返回一個 promise 對象)。看看這個用到了 fetch API 的例子:

function getUser(id) {
  console.log("fetching data");
  return fetch(`https://server/user/${id}`);
}

const userPromise = getUser(1);
console.log("I don&apos;t want that request now");

// 其它地方
userPromise.then(response => console.log("done");
// 輸出
// fetching data
// I don&apos;t want that request now
// done

只要流在你訂閱的時候執行了,promise 就會立即執行。我們調用 getUser 的瞬間,就發送了一個請求,哪怕我們這個時候不想發送請求。當然,我們可以使用 from 來把一個 promise 對象轉換成可觀察對象,但我們傳遞的 promise 對象已經創建或執行了。defer 讓我們能夠等到訂閱才發送這個請求:

const user$ = defer(() => getUser(1));

console.log("I don&apos;t want that request now");

// 其它地方
user$.subscribe(response => console.log("done");
// 輸出
// I don&apos;t want that request now
// fetching data
// done

iif

iif 包含了一個關於 defer 的特殊用例:在訂閱時選擇兩個流中的一個:

iif(
  () => new Date().getHours() < 12,
  of("AM"),
  of("PM")
)
.subscribe();
// 結果
// AM before noon, PM afterwards

引用該文檔:

實際上 iif 能夠輕鬆地用 defer 實現,它僅僅是出於方便和可讀性的目的。

onErrorResumeNext

開啟第一個流並且在失敗的時候繼續進行下一個流。錯誤被忽略掉。

const stream1$ = of(1, 2).pipe(
  tap(i => { if(i>1) throw &apos;error&apos;}) //fail after first element
);

const stream2$ = of(3,4);

onErrorResumeNext(stream1$, stream2$)
  .subscribe(console.log);
// 結果
// 1 3 4 |

如果你有多個 web 服務,這就很有用了。萬一主伺服器開啟失敗,那麼備份的服務就能自動調用。

forkJoin

它讓流並行運行,當流結束時發送存在數組中的最後的值。由於每個流只有最後一個值被發送,它一般用在只發送一個元素的流的情況,就像 HTTP 請求。你讓請求並行運行,在所有流收到響應時執行某些任務。

function handleResponses([user, account]) {
  // 執行某些任務
}

forkJoin(
  fetch("https://server/user/1"),
  fetch("https://server/account/1")
)
.subscribe(handleResponses);

merge / concat

發送每一個從可觀察對象源中發出的值。

merge 接收一個參數,讓你定義有多少流能被同時訂閱。默認是無限制的。設為 1 就意味著監聽一個源流,在它結束的時候訂閱下一個。由於這是一個常見的場景,RxJS 為你提供了一個顯示的函數:concat

merge(
  interval(1000).pipe(mapTo("Stream 1"), take(2)),
  interval(1200).pipe(mapTo("Stream 2"), take(2)),
  timer(0, 1000).pipe(mapTo("Stream 3"), take(2)),
  2 //two concurrent streams
)
.subscribe();

// 只訂閱流 1 和流 2

// 輸出
// Stream 1 -> after 1000ms
// Stream 2 -> after 1200ms
// Stream 1 -> after 2000ms

// 流 1 結束後,開始訂閱流 3

// 輸出
// Stream 3 -> after 0 ms
// Stream 2 -> after 400 ms (2400ms from beginning)
// Stream 3 -> after 1000ms

merge(
  interval(1000).pipe(mapTo("Stream 1"), take(2)),
  interval(1200).pipe(mapTo("Stream 2"), take(2))
  1
)
// 等效於
concat(
  interval(1000).pipe(mapTo("Stream 1"), take(2)),
  interval(1200).pipe(mapTo("Stream 2"), take(2))
)

// 輸出
// Stream 1 -> after 1000ms
// Stream 1 -> after 2000ms
// Stream 2 -> after 3200ms
// Stream 2 -> after 4400ms

zip / combineLatest

mergeconcat 一個接一個的發送所有從源流中讀到的值,而 zipcombineLatest 是把每個流中的一個值結合起來一起發送。zip 結合所有源流中發送的第一個值。如果流的內容相關聯,那麼這就很有用。

zip(
  interval(1000),
  interval(1200),
)
.subscribe();
// 結果
// [0, 0] [1, 1] [2, 2] ...

combineLatest 與之類似,但結合的是源流中發送的最後一個值。直到所有源流至少發送一個值之後才會觸發事件。這之後每次源流發送一個值,它都會把這個值與其他流發送的最後一個值結合起來。

combineLatest(
  interval(1000),
  interval(1200),
)
.subscribe();
// 結果
// [0, 0] [1, 0] [1, 1] [2, 1] ...

兩個函數都讓允許傳遞一個選擇器函數,把元素結合成其它對象而不是數組:

zip(
  interval(1000),
  interval(1200),
  (e1, e2) -> e1 + e2
)
.subscribe();
// 結果
// 0 2 4 6 ...

race

選擇第一個發送數據的流。產生的流基本是最快的。

race(
  interval(1000),
  of("foo")
)
.subscribe();
// 結果
// foo |

由於 of 立即產生一個值,因此它是最快的流,然而這個流就被選中了。

總結

已經有很多創建可觀察對象的方式了。如果你想要創造響應式的 API 或者想用響應式的 API 結合傳統 API,那麼了解這些方法很重要。

我已經向你展示了所有可用的方法,但它們其實還有很多內容可以講。如果你想更加深入地了解,我極力推薦你查閱 文檔 或者閱讀相關文章。

RxViz 是另一種值得了解的有意思的方式。你編寫 RxJS 代碼,產生的流可以用圖形或動畫進行顯示。

via: https://blog.angularindepth.com/the-extensive-guide-to-creating-streams-in-rxjs-aaa02baaff9a

作者:Oliver Flaggl 譯者:BriFuture 校對:wxy

本文由 LCTT 原創編譯,Linux中國 榮譽推出


本文轉載來自 Linux 中國: https://github.com/Linux-CN/archive

對這篇文章感覺如何?

太棒了
0
不錯
0
愛死了
0
不太好
0
感覺很糟
0
雨落清風。心向陽

    You may also like

    Leave a reply

    您的郵箱地址不會被公開。 必填項已用 * 標註

    此站點使用Akismet來減少垃圾評論。了解我們如何處理您的評論數據

    More in:Linux中國