基本的例子

排程程式是關於處理單元的 RxJava 抽象。排程程式可以由 Executor 服務支援,但你可以實現自己的排程程式實現。

一個 Scheduler 應該滿足這個要求:

  • 應按順序處理未延遲的任務(FIFO 順序)
  • 任務可以延遲

Scheduler 可以在某些運算子中用作引數(例如:delay),或者與 subscribeOn / observeOn 方法一起使用。

對於某些運算子,Scheduler 將用於處理特定運算子的任務。例如,delay 將安排將發出下一個值的延遲任務。這是一個將保留並稍後執行的 Scheduler

subscribeOn 每個 tihuan 可以使用一次 10。它將定義訂閱的程式碼將在哪個 Scheduler 執行。

observeOn 可以每次使用多次 13。它將定義 Scheduler 將用於執行 observeOn 方法之後定義的所有任務。observeOn 將幫助你執行跳線。

subscribeOn 特定的排程程式

// this lambda will be executed in the `Schedulers.io()`
Observable.fromCallable(() -> Thread.currentThread().getName())
          .subscribeOn(Schedulers.io())
          .subscribe(System.out::println); 

用特定的 Scheduler 觀察

Observable.fromCallable(() -> "Thread -> " + Thread.currentThread().getName())
         // next tasks will be executed in the io scheduler
         .observeOn(Schedulers.io())
         .map(str -> str + " -> " + Thread.currentThread().getName())
          // next tasks will be executed in the computation scheduler
         .observeOn(Schedulers.computation())
         .map(str -> str + " -> " + Thread.currentThread().getName())
         // next tasks will be executed in the io scheduler
         .observeOn(Schedulers.newThread())
         .subscribe(str -> System.out.println(str + " -> " + Thread.currentThread().getName()));   

使用運算子指定特定的排程程式

一些運算子可以將 Scheduler 作為引數。

Observable.just(1)
          // the onNext method of the delay operator will be executed in a new thread
          .delay(1, TimeUnit.SECONDS, Schedulers.newThread())
          .subscribe(System.out::println);

釋出到訂閱者:

TestScheduler testScheduler = Schedulers.test();
EventBus sut = new DefaultEventBus(testScheduler);
TestSubscriber<Event> subscriber = new TestSubscriber<Event>();
sut.get().subscribe(subscriber);
sut.publish(event);
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);

執行緒池:

this.poolName = schedulerFig.getIoSchedulerName();
final int poolSize = schedulerFig.getMaxIoThreads();
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
this.scheduler = Schedulers.from(threadPool);

Web Socket Observable:

final Subscription subscribe = socket.webSocketObservable()
        .subscribeOn(Schedulers.io())
        .doOnNext(new Action1<RxEvent>() {
            @Override
            public void call(RxEvent rxEvent) {
                System.out.println("Event: " + rxEvent);
            }
        })
        .subscribe();