快取 HTTP 響應

RxJS 的典型用例是建立 HTTP 請求並在一段時間內快取其結果。此外,我們總是希望一次只執行一個請求並共享其響應。

例如,以下程式碼快取 1 項最大值。 1000ms

var updateRequest = Observable.defer(() => makeMockHttpRequest())
    .publishReplay(1, 1000)
    .refCount()
    .take(1);

var counter = 1;    
function makeMockHttpRequest() {
    return Observable.of(counter++)
        .delay(100);
}

function requestCachedHttpResult() {
    return updateRequest;
}

函式 makeMockHttpRequest() 模擬以 100ms 延遲到達的 HTTP 請求。

函式 requestCachedHttpResult() 是我們訂閱獲取實際或快取響應的地方。

使用 .publishReplay(1, 1000),我們使用 RxJS 組播在內部使用 ReplaySubject 並保留 1 專案以獲得最大的 1000ms。然後 refCount() 用於始終只保留 source 的一個訂閱,即 Observable.defer()。此 Observable 用於建立新請求並遞增 counter 以證明快取的值和新訂閱共享相同的資料。

當我們想要獲取當前資料時,我們稱之為 requestCachedHttpResult()。為了確保在釋出資料後 Observer 將正確完成,我們使用了 take(1) 運算子。

requestCachedHttpResult()
    .subscribe(val => console.log("Response 0:", val));

這會使用 mockDataFetch() 建立一個請求並列印到控制檯:

1

一個更復雜的示例將在我們想要測試模擬的 HTTP 連線和響應被共享的不同時間呼叫多個請求。

requestCachedHttpResult()
    .subscribe(val => console.log("Response 0:", val));

setTimeout(() => requestCachedHttpResult()
    .subscribe(val => console.log("Response 50:", val))
, 50);

setTimeout(() => requestCachedHttpResult()
    .subscribe(val => console.log("Response 200:", val))
, 200);

setTimeout(() => requestCachedHttpResult()
    .subscribe(val => console.log("Response 1200:", val))
, 1200);

setTimeout(() => requestCachedHttpResult()
    .subscribe(val => console.log("Response 1500:", val))
, 1500);

setTimeout(() => requestCachedHttpResult()
    .subscribe(val => console.log("Response 3500:", val))
, 3500);

觀看現場演示: https//jsbin.com/todude/5/edit?js,console

每個請求都會延遲傳送,並按以下順序傳送:

0 - 首次請求使 refCount() 訂閱其 source,這使得 mockDataFetch() 呼叫。它的迴應將被推遲 22。此時 publishReplay() 內的 publishReplay() 運算子有一個 Observer。

50 - 第二個請求也訂閱了 ConnectableObservable。此時 ConnectableObservable 裡面的 publishReplay() 運算子有兩個 Observer。它不會使用 makeMockHttpRequest() 建立另一個請求,因為 refCount() 已經訂閱了。

100 - 第一個響應準備就緒。它首先由 ReplaySubject 快取,然後重新傳送給訂閱 ConnectableObservable兩位觀察者。由於 take(1) 和取消訂閱,兩位觀察員都完成了。

200 - 訂閱 ReplaySubject,它立即發出快取值,使 take(1) 完成 Observer 並立即取消訂閱。沒有發出 HTTP 請求,也沒有訂閱。

1200 - 與 0 的第一個匹配相同。此時快取的值已被丟棄,因為它比 1000ms 舊。

1500 - 與 200 的第四次活動​​相同。

3500 - 與 1200 的第一個匹配相同。

控制檯中的輸出如下:

Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: 2
Response 1500: 2
Response 3500: 3

在 RxJS 5 中,cache() 運算子涵蓋了類似的功能。然而,由於其功能有限,它在 5.0.0-rc.1被刪除。

處理錯誤

如果我們想要處理遠端服務(makeMockHttpRequest 函式)產生的錯誤,我們需要在它們合併到主 Observable 鏈之前捕獲它們,因為 publishReplay() 內部的 ReplaySubject 收到的任何錯誤都會將其內部狀態標記為 stopped(閱讀更多這裡主題和它的內部狀態 )這絕對不是我們想要的。

在下面的例子中,我們在 counter === 2 模擬錯誤並用 catch() 操作符捕獲它。我們使用 catch() 只將 error 通知轉換為常規 next,以便我們可以在觀察者中處理錯誤:

function makeMockHttpRequest() {
    return Observable.of(counter++)
        .delay(100)
        .map(i => {
            if (i === 2) {
                throw new Error('Invalid URL');
            }
            return i;
        })
        .catch(err => Observable.of(err));
}

觀看現場演示: https//jsbin.com/kavihu/10/edit?js,console

這將列印以控制以下輸出。請注意 next 處理程式中收到的錯誤:

Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: [object Error] { ... }
Response 1500: [object Error] { ... }
Response 3500: 3

如果我們想要在每個觀察者中處理常規 error 通知中的錯誤,我們必須在 publishReplay() 運算子之後重新丟擲它們,原因如上所述。

var updateRequest = Observable.defer(() => makeMockHttpRequest())
    .publishReplay(1, 1000)
    .refCount()
    .take(1)
    .map(val => {
        if (val instanceof Error) {
            throw val;
        }
        return val;
    });

請參閱現場演示: https//jsbin.com/fabosam/5/edit? js, console (請注意,我們必須為每個觀察者新增錯誤回撥)。

Response 0: 1
Response 50: 1
Response 200: 1
error callback: Error: Invalid URL
error callback: Error: Invalid URL
Response 3500: 3