先来看如何创建Observable
实例
1 2 3 4 5 6 Observable observable = Observable.create(new ObservableOnSubscribe () { @Override public void subscribe (ObservableEmitter e) throws Exception { e.onNext(1 ); } });
通过调用Observable
中的create
静态方法
1 2 3 4 5 public static <T> Observable<T> create (ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null" ); return RxJavaPlugins.onAssembly(new ObservableCreate <T>(source)); }
最终返回的是一个ObservableCreate
实例,它关联了一个我们创建的ObservableOnSubscribe
实例,ObservableCreate
是Observable
的子类。 接着创建Observer
实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Observer observer = new Observer () { @Override public void onSubscribe (Disposable d) { } @Override public void onNext (Object o) { } @Override public void onError (Throwable e) { } @Override public void onComplete () { } };
接下来调用observable.subscribe(observer);
,我们来看看subscribe
方法中做了什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public final void subscribe (Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null" ); try { observer = RxJavaPlugins.onSubscribe(this , observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer" ); subscribeActual(observer); } catch (NullPointerException e) { throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException ("Actually not, but can't throw other exceptions due to RS" ); npe.initCause(e); throw npe; } }
主要的就是subscribeActual(observer);
这一步,它的实现在ObservableCreate
类中
1 2 3 4 5 6 7 8 9 10 11 12 @Override protected void subscribeActual (Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter <T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
我们可以看到这个方法里首先创建了一个CreateEmitter
实例,并把observer
关联起来,然后调用observer.onSubscribe(parent);
,即执行了我们开始创建Observer
时候实现的onSubscribe(Disposable d)
方法,并把CreateEmitter
实例传递进去。这个CreateEmitter
实现了Disposable
接口。接下来执行source.subscribe(parent);
即调用我们一开始创建obervable
实例时 new 的ObservableOnSubscribe
实例的subscribe
方法,此时执行e.onNext(1);
,即CreateEmitter
中的onNext
方法
1 2 3 4 5 6 7 8 9 10 @Override public void onNext (T t) { if (t == null ) { onError(new NullPointerException ("onNext called with null. Null values are generally not allowed in 2.x operators and sources." )); return ; } if (!isDisposed()) { observer.onNext(t); } }
可以看到最后还是调用了observer.onNext(t);
。 接下来看看 RxJava 是如何进行线程切换的
1 2 3 4 observable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer);
这里添加了两行代码,Schedulers.io()
返回是一个IoScheduler
实例
1 2 3 4 public final Observable<T> subscribeOn (Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null" ); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn <T>(this , scheduler)); }
即把当前的Observable
和IoScheduler
实例关联到新创建的ObservableSubscribeOn
实例中去,最后返回这个实例,这个类也是继承于Observable
的。此时如果调用.subscribe(observer)
,那么会走到这里
1 2 3 4 5 6 7 8 @Override public void subscribeActual (final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver <T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask (parent))); }
和之前类似,先是把Observer
实例关联到新创建的SubscribeOnObserver
实例中,然后调用Observer
实例的onSubscribe
方法,然后创建一个SubscribeTask
实例,关联实例parent
,这个SubscribeTask
是一个实现了Runnable
接口的类,run()
中执行了source.subscribe(parent);
这时就大概可以猜到它会放到一个子线程里去执行
1 2 3 4 5 6 7 8 9 10 11 12 final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this .parent = parent; } @Override public void run () { source.subscribe(parent); } }
接着看scheduler.scheduleDirect(new SubscribeTask(parent))
1 2 3 4 5 6 7 8 9 10 11 12 @NonNull public Disposable scheduleDirect (@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask (decoratedRun, w); w.schedule(task, delay, unit); return task; }
我们来看IoScheduler
中createWorker()
是怎么实现的
1 2 3 public Worker createWorker () { return new EventLoopWorker (pool.get()); }
pool.get()
是获取CachedWorkerPool
实例,是个实例在IoScheduler
实例初始化的时候被创建
1 2 3 4 5 6 7 @Override public void start () { CachedWorkerPool update = new CachedWorkerPool (KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory); if (!pool.compareAndSet(NONE, update)) { update.shutdown(); } }
CachedWorkerPool
顾名思义就是ThreadWorker
的缓存池,用ConcurrentLinkedQueue
保存,接着
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 static final class EventLoopWorker extends Scheduler .Worker { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker; final AtomicBoolean once = new AtomicBoolean (); EventLoopWorker(CachedWorkerPool pool) { this .pool = pool; this .tasks = new CompositeDisposable (); this .threadWorker = pool.get(); } @Override public void dispose () { if (once.compareAndSet(false , true )) { tasks.dispose(); pool.release(threadWorker); } } @Override public boolean isDisposed () { return once.get(); } @NonNull @Override public Disposable schedule (@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } }
构造方法里调用了pool.get()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ThreadWorker get () { if (allWorkers.isDisposed()) { return SHUTDOWN_THREAD_WORKER; } while (!expiringWorkerQueue.isEmpty()) { ThreadWorker threadWorker = expiringWorkerQueue.poll(); if (threadWorker != null ) { return threadWorker; } } ThreadWorker w = new ThreadWorker (threadFactory); allWorkers.add(w); return w; }
这一步其实就是判断缓存队列是不是空的,如果不是,就从中取出一个,否则就创建一个并 add 到allWorkers
中。 ok,createWorker()
这一步终于走完了,接着创建DisposeTask
实例并把之前的Worker
和Runnable
实例关联进去,下一步执行w.schedule(task, delay, unit);
1 2 3 4 5 6 7 8 9 10 @NonNull @Override public Disposable schedule (@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); }
用之前获取到的ThreadWorker
实例调用scheduleActual
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @NonNull public ScheduledRunnable scheduleActual (final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable (decoratedRun, parent); if (parent != null ) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { if (delayTime <= 0 ) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null ) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; }
我们可以看到,最终提交到executor
线程池中执行,最终才会执行source.subscribe(parent);
subscribeOn(Schedulers.io())
分析完,接下来看observeOn(AndroidSchedulers.mainThread())
是如何执行的
1 2 3 4 private static final class MainHolder { static final Scheduler DEFAULT = new HandlerScheduler (new Handler (Looper.getMainLooper())); }
1 2 3 4 5 @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> observeOn (Scheduler scheduler) { return observeOn(scheduler, false , bufferSize()); }
1 2 3 4 5 6 7 @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> observeOn (Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null" ); ObjectHelper.verifyPositive(bufferSize, "bufferSize" ); return RxJavaPlugins.onAssembly(new ObservableObserveOn <T>(this , scheduler, delayError, bufferSize)); }
这里创建了一个ObservableObserveOn
实例,继续来看subscribeActual(Observer<? super T> observer)
1 2 3 4 5 6 7 8 9 10 @Override protected void subscribeActual (Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver <T>(observer, w, delayError, bufferSize)); } }
scheduler.createWorker();
这里返回的是从HandlerScheduler
里创建的HandlerWorker(handler)
,这个handler
是与主线程的looper
绑定的。
1 2 3 4 @Override public Worker createWorker () { return new HandlerWorker (handler); }
接着调用source.subscribe
,source
就是我们调用.subscribeOn(Schedulers.io())
返回的ObservableSubscribeOn
实例,回到上面讲过的调用流程了。接下来如果执行e.onNext(1);
那么就是调用ObserveOnObserver
实例中的onNext
方法
1 2 3 4 5 6 7 8 9 10 11 @Override public void onNext (T t) { if (done) { return ; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); }
执行schedule();
1 2 3 4 5 void schedule () { if (getAndIncrement() == 0 ) { worker.schedule(this ); } }
最终会执行如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Override ublic Disposable schedule (Runnable run, long delay, TimeUnit unit) { if (run == null ) throw new NullPointerException ("run == null" ); if (unit == null ) throw new NullPointerException ("unit == null" ); if (disposed) { return Disposables.disposed(); } run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable (handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this ; handler.sendMessageDelayed(message, Math.max(0L , unit.toMillis(delay))); if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; }
因为我们之前讲过handler
是绑定主线程的,然后就会在主线程中执行run()
方法,线程切换就在这里完成,接着执行ObserveOnObserver
中的run()
方法
1 2 3 4 5 6 7 8 @Override public void run () { if (outputFused) { drainFused(); } else { drainNormal(); } }
接下来会走drainNormal();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 void drainNormal () { int missed = 1 ; final SimpleQueue<T> q = queue; final Observer<? super T> a = actual; for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return ; } for (;;) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.dispose(); q.clear(); a.onError(ex); worker.dispose(); return ; } boolean empty = v == null ; if (checkTerminated(d, empty, a)) { return ; } if (empty) { break ; } a.onNext(v); } missed = addAndGet(-missed); if (missed == 0 ) { break ; } } }
这里就是从队列中取出消息,然后调用onNext
方法。 总结:RxJava 的优点在于链式调用,逻辑非常清楚,切换线程也十分方便,也可以随时中断流程。