冷热观测资料

可观察量大致分为 HotCold,具体取决于它们的排放行为。
Cold Observable 是根据请求(订阅)开始发出的,而 Hot Observable 是不管订阅而发出的 Hot Observable


冷可观察

/* Demonstration of a Cold Observable */
Observable<Long> cold = Observable.interval(500, TimeUnit.MILLISECONDS); // emits a long every 500 milli seconds
cold.subscribe(l -> System.out.println("sub1, " + l)); // subscriber1
Thread.sleep(1000); // interval between the two subscribes
cold.subscribe(l -> System.out.println("sub2, " + l)); // subscriber2

上面代码的输出看起来像(可能会有所不同):

sub1, 0    -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 0    -> subscriber2 starts
sub1, 3
sub2, 1
sub1, 4
sub2, 2

请注意,即使 sub2 启动较晚,它也会从一开始就接收值。总而言之,Cold Observable 仅在需要时发出物品。多个请求启动多个管道。


热观察

注意:Hot observables 会发出独立于各个订阅的值。他们有自己的时间表,无论有人在听,都会发生事件。

Cold Observale 可以通过简单的 publish 转换为 Hot Observable

Observable.interval(500, TimeUnit.MILLISECONDS)
    .publish(); // publish converts cold to hot

publish 返回一个 ConnectableObservable,它增加了与 observable 连接断开**连接的功能。

ConnectableObservable<Long> hot = Observable
                                    .interval(500, TimeUnit.MILLISECONDS)
                                    .publish(); // returns ConnectableObservable
hot.connect(); // connect to subscribe

hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));

以上输出:

sub1, 0  -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2  -> subscriber2 starts
sub1, 3
sub2, 3

请注意,即使 sub2 开始观察较晚,它也与 sub1 同步。
断开连接有点复杂! 断开发生在 Subscription 而不是 Observable

ConnectableObservable<Long> hot = Observable
                                    .interval(500, TimeUnit.MILLISECONDS)
                                    .publish(); // same as above
Subscription subscription = hot.connect(); // connect returns a subscription object, which we store for further use

hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe(); // disconnect, or unsubscribe from subscription

System.out.println("reconnecting");
/* reconnect and redo */
subscription = hot.connect();
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe();

以上产生:

sub1, 0   -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2   -> subscriber2 starts
sub1, 3
sub2, 3
reconnecting  -> reconnect after unsubscribe
sub1, 0
...

断开连接后,Observable 基本上终止并在添加新订阅时重新启动。

Hot Observable 可用于创建 EventBus。这种 EventBuses 通常很轻且超快。RxBus 的唯一缺点是必须手动实现所有事件并将其传递给总线。