限速

遠端服務的常見問題是速率限制。遠端服務允許我們每個時間段僅傳送有限數量的請求或資料量。

在 RxJS 5 中, bufferTime 運算子提供了一個非常類似的功能,特別是如果我們保留第二個引數未指定(它定義了我們想要建立新緩衝區的頻率。如果我們將它保留為 undefined / null,它將建立一個新的緩衝區許可權發射當前的一個)。

bufferTime 的典型用法如下所示:

bufferTime(1000, null, 5)

這將緩衝專案,直到滿足兩個條件之一。然後它將發出緩衝區並啟動另一個緩衝區:

  • 經營者一直在為 1000ms 收集物品
  • 運算子已經收集了 5 專案

出於演示目的,我們可以建立一個非常快速發出的源 Observable,因此 bufferTime 將達到大小限制(5)並且每次發射的次數通常超過一次 7:

const source = Observable.range(1, 25)
  .concatMap(val => Observable.of(val).delay(75));

然後我們將用 bufferTimeconcatMap 連結它。concatMap 運算子是我們迫使 1000ms 延遲的地方:

const startTime = (new Date()).getTime();

const source = Observable.range(1, 25)
  .concatMap(val => Observable.of(val).delay(75));

source.bufferTime(1000, null, 5)
  .concatMap(buffer => Observable.of(buffer).delay(1000))
  .timestamp()
  .map(obj => {
    obj.timestamp = obj.timestamp - startTime;
    return obj;
  })
  .subscribe(obj => console.log(obj));

觀看現場演示: https//jsbin.com/kotibow/3/edit?js,console

我們還新增了 timestamp() 來檢視排放時間,以確保延遲真的至少是 1000ms

請注意,我們根本不必使用 Observable.of(buffer)。我們在這裡使用它只是為了手動檢查緩衝項的數量是否正確。

從控制檯輸出我們可以看到兩次排放之間的延遲大致相當於 15:

Timestamp { value: [ 1, 2, 3, 4, 5 ], timestamp: 1475 }
Timestamp { value: [ 6, 7, 8, 9, 10 ], timestamp: 2564 }
Timestamp { value: [ 11, 12, 13, 14, 15 ], timestamp: 3567 }
Timestamp { value: [ 16, 17, 18, 19, 20 ], timestamp: 4572 }
Timestamp { value: [ 21, 22, 23, 24, 25 ], timestamp: 5573 }
Timestamp { value: [], timestamp: 6578 }

現在我們還可以測試源緩慢發出的情況,以便 bufferTime 運算子將達到最大間隔條件:

const source = Observable.range(1, 25)
  .concatMap(val => Observable.of(val).delay(300));

觀看現場演示: https//jsbin.com/tuwowan/2/edit?js,console

然後輸出應該在關於 2s 之後開始,因為 1s 需要 1s 才能發出,然後我們新增了 1s 延遲;

Timestamp { value: [ 1, 2, 3 ], timestamp: 2017 }
Timestamp { value: [ 4, 5, 6 ], timestamp: 3079 }
Timestamp { value: [ 7, 8, 9, 10 ], timestamp: 4088 }
Timestamp { value: [ 11, 12, 13 ], timestamp: 5093 }
Timestamp { value: [ 14, 15, 16 ], timestamp: 6094 }
Timestamp { value: [ 17, 18, 19, 20 ], timestamp: 7098 }
Timestamp { value: [ 21, 22, 23 ], timestamp: 8103 }
Timestamp { value: [ 24, 25 ], timestamp: 9104 }

如果我們想在現實世界的應用程式中使用這種方法,我們將遠端呼叫放入 concatMap 運算子。通過這種方式,我們可以控制是否要在遠端服務的請求或響應之間強制切換。

例如,我們可以通過在 concatMap 回撥中使用 forkJoin 來強制請求之間的最小 1s 延遲:

function mockHTTPRequest(buffer) {
  return Observable.of(true).delay(Math.random() * 1500)
}

const startTime = (new Date()).getTime();
const source = Observable.range(1, 25)
  .concatMap(val => Observable.of(val).delay(75));

source.bufferTime(1000, null, 5)
  .concatMap(buffer => Observable.forkJoin(
    mockHTTPRequest(buffer),
    Observable.of(buffer).delay(1000)
  ))
  .timestamp()
  .map(obj => {
    obj.timestamp = obj.timestamp - startTime;
    return obj;
  })
  .subscribe(obj => console.log(obj));

觀看現場演示: https//jsbin.com/xijaver/edit?js,console

感謝 forkJoinconcatMap 需要等待兩個 Observable 完成。

另一方面,如果我們想在響應之間強制 1s 延遲,我們只需在 mockHTTPRequest() 之後附加 delay() 運算子:

.concatMap(buffer => mockHTTPRequest(buffer).delay(1000))

觀看現場演示: https//jsbin.com/munopot/2/edit?js,console