PublishSubject

PublishSubject 僅在訂閱時間之後向源 Observable 發出的那些專案傳送到 Observer

一個簡單的 PublishSubject 示例:

Observable<Long> clock = Observable.interval(500, TimeUnit.MILLISECONDS);
Subject<Long, Long> subjectLong = PublishSubject.create();

clock.subscribe(subjectLong);

System.out.println("sub1 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub1 -> " + l));
Thread.sleep(3000);
System.out.println("sub2 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub2 -> " + l));
Thread.sleep(5000);

輸出:

sub1 subscribing...
sub1 -> 0
sub1 -> 1
sub2 subscribing...
sub1 -> 2
sub2 -> 2
sub1 -> 3
sub2 -> 3

在上面的示例中,PublishSubject 訂閱了 Observable,其作用類似於時鐘,並且每 500 毫秒發出一個專案(Long)。如輸出中所示,PublishSubject 傳遞從源(clock)到其訂戶(sub1sub2)的值。

PublishSubject 可以在建立物品後立即開始發射物品,而無需任何觀察者,這會冒一個或多個物品丟失的風險,直到觀察者能夠進行消防。

createClock(); // 3 lines moved for brevity. same as above example

Thread.sleep(5000); // introduces a delay before first subscribe

sub1andsub2(); // 6 lines moved for brevity. same as above example

輸出:

sub1 subscribing...
sub1 -> 10
sub1 -> 11
sub2 subscribing...
sub1 -> 12
sub2 -> 12
sub1 -> 13
sub2 -> 13

請注意,sub110 開始發出值。引入的 5 秒延遲導致物品丟失。這些不能再現。這基本上使得 PublishSubject 成為了一個節目。

另外,請注意,如果觀察者在發出 n 個專案之後訂閱了 PublishSubject,則無法為該觀察者再現這些 n 個專案。 **

下面是 PublishSubject 的大理石圖

http://i.stack.imgur.com/UKFxw.jpg

在呼叫源 ObservableonCompleted 之前的任何時間點,PublishSubject 向所有已訂閱的專案傳送專案。

如果源 Observable 以錯誤終止,則 PublishSubject 將不會向後續觀察者發出任何專案,但只會傳遞來自源 Observable 的錯誤通知。

http://i.stack.imgur.com/BlLyD.jpg

用例
假設你要建立一個應用程式,該應用程式將監視某個公司的股票價格並將其轉發給所有請求它的客戶。

/* Dummy stock prices */
Observable<Integer> prices = Observable.just(11, 12, 14, 11, 10, 12, 15, 11, 10);

/* Your server */
PublishSubject<Integer> watcher = PublishSubject.create();
/* subscribe to listen to stock price changes and push to observers/clients */
prices.subscribe(watcher);

/* Client application */
stockWatcher = getWatcherInstance(); // gets subject
Subscription steve = stockWatcher.subscribe(i -> System.out.println("steve watching " + i));
Thread.sleep(1000);
System.out.println("steve stops watching");
steve.unsubscribe();

在上面的示例用例中,PublishSubject 充當橋接器,將值從伺服器傳遞到訂閱 watcher 的所有客戶端。

進一步閱讀: