onBackpressureXXX 運算子

當應用程式使用 MissingBackpressureException 失敗時,大多數開發人員會遇到背壓,而異常通常指向 observeOn 運算子。實際原因通常是通過 create() 建立的 PublishSubjecttimer()interval() 或自定義運算子的非反壓使用。

有幾種方法可以處理這種情況。

增加緩衝區大小

有時這種溢位是由於突發源而發生的。突然,使用者太快地點選了螢幕,並且 Android 上的 observeOn 預設的 16 元素內部緩衝區溢位。

最近版本的 RxJava 中的大多數背壓敏感運算子現在允許程式設計師指定其內部緩衝區的大小。相關引數通常稱為 bufferSizeprefetchcapacityHint。鑑於介紹中的溢位示例,我們可以增加 observeOn 的緩衝區大小,以便為所有值提供足夠的空間。

PublishSubject<Integer> source = PublishSubject.create();

source.observeOn(Schedulers.computation(), 1024 * 1024)
      .subscribe(e -> { }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

但請注意,通常情況下,這可能只是一個臨時修復,因為如果源過量生成預測的緩衝區大小,仍可能發生溢位。在這種情況下,可以使用以下運算子之一。

使用標準運算子批處理/跳過值

如果可以批量處理源資料,則可以通過使用標準批處理操作符之一(按大小和/或按時間)來降低 MissingBackpressureException 的可能性。

PublishSubject<Integer> source = PublishSubject.create();

source
      .buffer(1024)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(list -> { 
          list.parallelStream().map(e -> e * e).first();
      }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

如果可以安全地忽略某些值,可以使用取樣(使用時間或其他 Observable)和限制運算子(throttleFirstthrottleLastthrottleWithTimeout)。

PublishSubject<Integer> source = PublishSubject.create();

source
      .sample(1, TimeUnit.MILLISECONDS)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

請注意,這些運算子只會降低下游的價值接收率,因此它們仍然可能會導致這種情況發生。

onBackpressureBuffer()

該無操作器形式的運算子在上游源和下游運算子之間重新引入無界緩衝區。無限制意味著只要 JVM 沒有記憶體不足,它就可以處理來自突發源的幾乎任何數量。

 Observable.range(1, 1_000_000)
           .onBackpressureBuffer()
           .observeOn(Schedulers.computation(), 8)
           .subscribe(e -> { }, Throwable::printStackTrace);

在這個例子中,observeOn 具有非常低的緩衝區大小但是沒有 MissingBackpressureException,因為 onBackpressureBuffer 吸收所有 100 萬個值並將小批量交給 observeOn

但請注意,onBackpressureBuffer 以無限制的方式消耗其來源,即不對其施加任何背壓。這導致即使是背壓支援源如 range 也將完全實現。

onBackpressureBuffer 還有 4 個額外的過載

onBackpressureBuffer(int capacity)

這是一個有界的版本,在其緩衝區達到給定容量的情況下發出訊號。

Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

隨著越來越多的運算子現在允許設定其緩衝區大小,此運算子的相關性正在下降。對於其餘部分,這提供了一個擴充套件其內部緩衝區的機會,通過使用 onBackpressureBuffer 比預設值更大的數字。

onBackpressureBuffer(int capacity,Action0 onOverflow)

如果發生溢位,此過載會呼叫(共享)操作。它的用處相當有限,因為沒有提供有關溢位的其他資訊而不是當前的呼叫堆疊。

onBackpressureBuffer(int capacity,Action0 onOverflow,BackpressureOverflow.Strategy strategy)

這種過載實際上更有用,因為它可以定義在達到容量時要做什麼。BackpressureOverflow.Strategy 實際上是一個介面,但是類 BackpressureOverflow 提供了 4 個靜態欄位,其實現代表了典型的操作:

  • ON_OVERFLOW_ERROR:這是前兩個過載的預設行為,表示一個 BufferOverflowException
  • ON_OVERFLOW_DEFAULT:目前和 ON_OVERFLOW_ERROR 相同
  • ON_OVERFLOW_DROP_LATEST:如果發生溢位,將簡單地忽略當前值,並且只有下游請求才會傳遞舊值。
  • ON_OVERFLOW_DROP_OLDEST:刪除緩衝區中最舊的元素並將當前值新增到緩衝區中。
Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16, () -> { },
              BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

請注意,最後兩個策略會導致流中斷,因為它們會丟棄元素。此外,他們不會發出訊號 34。

onBackpressureDrop()

每當下游未準備好接收值時,此運算子將從序列中刪除該 elemenet。人們可以將其視為 0 容量 onBackpressureBuffer 與策略 ON_OVERFLOW_DROP_LATEST

當人們可以安全地忽略來自源的值(例如滑鼠移動或當前 GPS 位置訊號)時,此運算子非常有用,因為稍後會有更多的最新值。

 component.mouseMoves()
 .onBackpressureDrop()
 .observeOn(Schedulers.computation(), 1)
 .subscribe(event -> compute(event.x, event.y));

它可能與源操作符 interval() 結合使用。例如,如果想要執行某些定期後臺任務但每次迭代的持續時間可能比期間長,則可以安全地刪除多餘的間隔通知,因為稍後會有更多:

 Observable.interval(1, TimeUnit.MINUTES)
 .onBackpressureDrop()
 .observeOn(Schedulers.io())
 .doOnNext(e -> networkCall.doStuff())
 .subscribe(v -> { }, Throwable::printStackTrace);

此運算子存在一個過載:onBackpressureDrop(Action1<? super T> onDrop),其中呼叫(共享)操作並刪除該值。該變體允許自身清理值(例如,釋放相關資源)。

onBackpressureLatest()

最終運算子僅保留最新值,並實際覆蓋較舊的未傳遞值。人們可以將此視為 onBackpressureBuffer 的變體,其容量為 1,策略為 ON_OVERFLOW_DROP_OLDEST

onBackpressureDrop 不同,如果下游恰好落後,總有一個可供消費的價值。這在類似遙測的情況下非常有用,在這種情況下,資料可能會以某種突發模式出現,但只有最新的資料才會對處理感興趣。

例如,如果使用者在螢幕上點選了很多,我們仍然希望對其最新輸入作出反應。

component.mouseClicks()
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(event -> compute(event.x, event.y), Throwable::printStackTrace);

在這種情況下使用 onBackpressureDrop 會導致最後一次點選被丟棄並讓使用者想知道為什麼沒有執行業務邏輯。