建立背壓資料來源

在處理背壓時,建立背壓資料來源是相對容易的任務,因為庫已經在 Observable 上提供靜態方法來處理開發人員的背壓。我們可以區分兩種工廠方法:基於下游需求返回和生成元素的冷發電機和通常橋接非反應和/或非反壓資料來源的熱推動器,並在上面層疊一些背壓處理。他們。

只是

最基本的背壓感知源是通過 just 建立的:

Observable.just(1).subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(0);
    }

    @Override
    public void onNext(Integer v) {
        System.out.println(v);
    }
   
    // the rest is omitted for brevity
}

由於我們明確地沒有在 onStart 中請求,因此不會列印任何內容。just 很棒,當有一個常數值時,我們想跳一個序列。

不幸的是,just 經常被誤認為是一種動態計算內容以供 Subscribers 使用的方法:

int counter;

int computeValue() {
   return ++counter;
}

Observable<Integer> o = Observable.just(computeValue());

o.subscribe(System.out:println);
o.subscribe(System.out:println);

令某些人感到驚訝的是,這列印 1 次,而不是分別列印 1 和 2。如果重寫了呼叫,很明顯為什麼它會這樣:

int temp = computeValue();

Observable<Integer> o = Observable.just(temp);

computeValue 被稱為主程式的一部分,而不是響應訂閱者的訂閱。

fromCallable

人們真正需要的是方法 fromCallable

Observable<Integer> o = Observable.fromCallable(() -> computeValue());

這裡 computeValue 僅在使用者訂閱並且為每個訂閱者列印預期的 1 和 2 時執行。當然,fromCallable 也適當地支援背壓,並且除非請求,否則不會發出計算值。但請注意,無論如何計算確實會發生。如果計算本身應該延遲到下游實際請求,我們可以使用 justmap

Observable.just("This doesn't matter").map(ignored -> computeValue())...

just 在被對映到 computeValue 的結果時才會發出其常量值,仍然會單獨為每個使用者呼叫。

如果資料已經作為一個物件陣列,一個物件列表或任何 Iterable 源提供,相應的 from 過載將處理這些源的背壓和發射:

 Observable.from(Arrays.asList(1, 2, 3, 4, 5)).subscribe(System.out::println);

為了方便(並避免關於通用陣列建立的警告),just 有 2 到 10 個引數過載,內部委託給 from

from(Iterable) 也提供了一個有趣的機會。許多價值生成可以以狀態機的形式表達。每個請求的元素觸發狀態轉換和返回值的計算。

編寫像 Iterables 這樣的狀態機有點複雜(但是比編寫 Observable 更容易使用它)並且與 C#不同,Java 沒有任何編譯器支援通過簡單編寫經典的程式碼來構建這樣的狀態機(使用 yield returnyield break)。一些庫提供了一些幫助,例如 Google Guava 的 AbstractIterable 和 IxJava 的 Ix.generate()Ix.forloop()。這些本身就值得一個完整的系列,所以讓我們看一些非常基本的 Iterable 源,它無限地重複一些常量值:

Iterable<Integer> iterable = () -> new Iterator<Integer>() {
    @Override
    public boolean hasNext() {
        return true;
    }

    @Override
    public Integer next() {
        return 1;
    }
};

Observable.from(iterable).take(5).subscribe(System.out::println);

如果我們通過經典 for 迴圈消耗 iterator,那將導致無限迴圈。由於我們從中構建了一個 Observable,我們可以表達我們的意願,只消耗它的前 5 個,然後停止請求任何東西。這是在 Observables 內懶惰評估和計算的真正力量。

建立(SyncOnSubscribe)

有時,要轉換為被動世界本身的資料來源是同步(阻塞)和拉式,也就是說,我們必須呼叫一些 getread 方法來獲取下一段資料。當然,人們可以把它變成一個東西,但是當這些資源與資源相關時,如果下游在它結束之前取消訂閱序列,我們可能會洩漏這些資源。

為了處理這種情況,RxJava 有 SyncOnSubscribe 類。可以擴充套件它並實現其方法或使用其基於 lambda 的工廠方法之一來構建例項。

SyncOnSubscribe<Integer, InputStream> binaryReader = SyncOnSubscribe.createStateful(
     () -> new FileInputStream("data.bin"),
     (inputstream, output) -> {
         try {
             int byte = inputstream.read();
             if (byte < 0) {
                 output.onCompleted();
             } else {
                 output.onNext(byte);
             }
         } catch (IOException ex) {
             output.onError(ex);
         }
         return inputstream;
     },
     inputstream -> {
         try {
             inputstream.close();
         } catch (IOException ex) {
             RxJavaHooks.onError(ex);
         }
     } 
 );

 Observable<Integer> o = Observable.create(binaryReader);

通常,SyncOnSubscribe 使用 3 個回撥。

第一個回撥允許建立一個每使用者狀態,例如示例中的 FileInputStream; 該檔案將獨立開啟給每個使用者。

第二個回撥接受此狀態物件並提供輸出 Observer,其 onXXX 方法可以被呼叫以發出值。此回撥執行的次數與下游請求的次數相同。在每次呼叫時,它必須最多呼叫 onNext 一次,然後選擇 onErroronCompleted。在示例中,如果讀取位元組為負,指示檔案結束,則呼叫 onCompleted(),如果讀取了 IOException,則呼叫 onError

當下遊取消​​訂閱(關閉輸入流)或前一個回撥呼叫終端方法時,將呼叫最終的回撥; 它允許釋放資源。由於並非所有源都需要所有這些功能,因此 SyncOnSubscribe 的靜態方法可以建立沒有它們的例項。

不幸的是,JVM 和其他庫中的許多方法呼叫丟擲了已檢查的異常,需要將其包裝到 try-catches 中,因為此類使用的功能介面不允許丟擲已檢查的異常。

當然,我們可以模仿其他典型的來源,例如無限範圍:

SyncOnSubscribe.createStateful(
     () -> 0,
     (current, output) -> {
         output.onNext(current);
         return current + 1;
     },
     e -> { }
);

在這個設定中,current0 開始,下次呼叫 lambda 時,引數 current 現在保持 1

有一個名為 AsyncOnSubscribeSyncOnSubscribe 的變體看起來非常相似,除了中間回撥也採用表示來自下游的請求數量的長值,並且回撥應該生成具有完全相同長度的 Observable。然後該源將所有這些 Observable 連線成單個序列。

 AsyncOnSubscribe.createStateful(
     () -> 0,
     (state, requested, output) -> {
         output.onNext(Observable.range(state, (int)requested));
         return state + 1;
     },
     e -> { }
 );

關於這個類的有用性正在進行(熱烈)討論,並且通常不推薦,因為它通常會打破關於它將如何實際發出這些生成的值以及它將如何響應的期望,甚至是它將在何時收到的請求值。更復雜的消費者情景。

建立(發射器)

有時,被包裝到 Observable 中的源已經很熱(例如滑鼠移動)或冷卻但在其 API 中不能反壓(例如非同步網路回撥)。

為了處理這種情況,最新版本的 RxJava 引入了 create(emitter) 工廠方法。它需要兩個引數:

  • 將使用 Emitter<T> 介面的例項為每個傳入訂閱者呼叫的回撥,
  • 一個 Emitter.BackpressureMode 列舉,要求開發人員指定要應用的背壓行為。它有通常的模式,類似於 onBackpressureXXX,除了發訊號通知 MissingBackpressureException 或完全忽略它內部的這種溢位。

請注意,它目前不支援那些背壓模式的附加引數。如果需要那些定製,使用 NONE 作為背壓模式並在生成的 Observable 上應用相關的 onBackpressureXXX 是可行的方法。

當人們想要與基於推送的源(例如 GUI 事件)互動時使用的第一種典型情況。這些 API 具有某種形式的 addListener / removeListener 呼叫,可以使用:

Observable.create(emitter -> {
    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    emitter.setCancellation(() -> 
        button.removeListener(al));

}, BackpressureMode.BUFFER);

Emitter 使用相對簡單; 可以在其上呼叫 onNextonErroronCompleted,運算子可以自行處理背壓和取消訂閱管理。此外,如果包裝的 API 支援取消(例如示例中的偵聽器刪除),則可以使用 setCancellation(或 setSubscription 用於類似 Subscription 的資源)來註冊在下游取消訂閱或 onError /時呼叫的取消回撥。在提供的 Emitter 例項上呼叫 onCompleted

這些方法一次只允許一個資源與發射器關聯,而設定一個新資源會自動取消訂閱舊資源。如果必須處理多個資源,請建立一個 CompositeSubscription,將其與發射器關聯,然後將更多資源新增到 CompositeSubscription 本身:

Observable.create(emitter -> {
    CompositeSubscription cs = new CompositeSubscription();

    Worker worker = Schedulers.computation().createWorker();

    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    cs.add(worker);
    cs.add(Subscriptions.create(() -> 
        button.removeActionListener(al));

    emitter.setSubscription(cs);

}, BackpressureMode.BUFFER);

第二種情況通常涉及一些非同步的,基於回撥的 API,必須轉換為 Observable

Observable.create(emitter -> {
    
    someAPI.remoteCall(new Callback<Data>() {
        @Override
        public void onSuccess(Data data) {
            emitter.onNext(data);
            emitter.onCompleted();
        }

        @Override
        public void onFailure(Exception error) {
            emitter.onError(error);
        }
    });

}, BackpressureMode.LATEST);

在這種情況下,委託以相同的方式工作。不幸的是,通常情況下,這些經典的回撥式 API 不支援取消,但如果他們這樣做,可以像在 previoius 示例中那樣設定取消(儘管可能採用更復雜的方式)。注意使用 LATEST 背壓模式; 如果我們知道只有一個值,我們不需要 BUFFER 策略,因為它分配一個永遠不會被充分利用的預設 128 元素長緩衝區(必要時增長)。