PublishReplay 如何工作

它在內部建立了一個 ReplaySubject 並使其與 multicast 相容。ReplaySubject 的最小重放值是 1 發射。這導致以下結果:

  • 首次訂閱將觸發 publishReplay(1) 內部訂閱源流並通過 ReplaySubject 管理所有排放,有效快取最後 n (= 1)個排放
  • 如果在源仍處於活動狀態時啟動第二個訂閱,則 multicast() 將我們連線到相同的 replaySubject,我們將接收所有下一個排放,直到源流完成。
  • 如果在源已經完成之後啟動訂閱,則 replaySubject 已快取最後 n 個排放,並且它將僅在完成之前接收那些。
const source = Rx.Observable.from([1,2])
  .mergeMap(i => Rx.Observable.of('emission:'+i).delay(i * 100))
  .do(null,null,() => console.log('source stream completed'))
  .publishReplay(1)
  .refCount();

// two subscriptions which are both in time before the stream completes
source.subscribe(val => console.log(`sub1:${val}`), null, () => console.log('sub1 completed'));
source.subscribe(val => console.log(`sub2:${val}`), null, () => console.log('sub2 completed'));

// new subscription after the stream has completed already
setTimeout(() => {
  source.subscribe(val => console.log(`sub_late-to-the-party:${val}`), null, () => console.log('sub_late-to-the-party completed'));
}, 500);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>