先来看如何创建Observable实例

1
2
3
4
5
6
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext(1);
}
});

通过调用Observable中的create静态方法

1
2
3
4
5
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

最终返回的是一个ObservableCreate实例,它关联了一个我们创建的ObservableOnSubscribe实例,ObservableCreateObservable的子类。
接着创建Observer实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Object o) {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
};

接下来调用observable.subscribe(observer);,我们来看看subscribe方法中做了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}

主要的就是subscribeActual(observer);这一步,它的实现在ObservableCreate类中

1
2
3
4
5
6
7
8
9
10
11
12
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);

try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

我们可以看到这个方法里首先创建了一个CreateEmitter实例,并把observer关联起来,然后调用observer.onSubscribe(parent);,即执行了我们开始创建Observer时候实现的onSubscribe(Disposable d)方法,并把CreateEmitter实例传递进去。这个CreateEmitter实现了Disposable接口。接下来执行source.subscribe(parent);即调用我们一开始创建obervable实例时 new 的ObservableOnSubscribe实例的subscribe方法,此时执行e.onNext(1);,即CreateEmitter中的onNext方法

1
2
3
4
5
6
7
8
9
10
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}

可以看到最后还是调用了observer.onNext(t);
接下来看看 RxJava 是如何进行线程切换的

1
2
3
4
observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);

这里添加了两行代码,Schedulers.io()返回是一个IoScheduler实例

1
2
3
4
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

即把当前的ObservableIoScheduler实例关联到新创建的ObservableSubscribeOn实例中去,最后返回这个实例,这个类也是继承于Observable的。此时如果调用.subscribe(observer),那么会走到这里

1
2
3
4
5
6
7
8
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

s.onSubscribe(parent);

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

和之前类似,先是把Observer实例关联到新创建的SubscribeOnObserver实例中,然后调用Observer实例的onSubscribe方法,然后创建一个SubscribeTask实例,关联实例parent,这个SubscribeTask是一个实现了Runnable接口的类,run()中执行了source.subscribe(parent);这时就大概可以猜到它会放到一个子线程里去执行

1
2
3
4
5
6
7
8
9
10
11
12
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;

SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}

@Override
public void run() {
source.subscribe(parent);
}
}

接着看scheduler.scheduleDirect(new SubscribeTask(parent))

1
2
3
4
5
6
7
8
9
10
11
12
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

DisposeTask task = new DisposeTask(decoratedRun, w);

w.schedule(task, delay, unit);

return task;
}

我们来看IoSchedulercreateWorker()是怎么实现的

1
2
3
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}

pool.get()是获取CachedWorkerPool实例,是个实例在IoScheduler实例初始化的时候被创建

1
2
3
4
5
6
7
@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}

CachedWorkerPool顾名思义就是ThreadWorker的缓存池,用ConcurrentLinkedQueue保存,接着

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;

final AtomicBoolean once = new AtomicBoolean();

EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}

@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();

// releasing the pool should be the last action
pool.release(threadWorker);
}
}

@Override
public boolean isDisposed() {
return once.get();
}

@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}

return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}

构造方法里调用了pool.get()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}

// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}

这一步其实就是判断缓存队列是不是空的,如果不是,就从中取出一个,否则就创建一个并 add 到allWorkers中。
ok,createWorker()这一步终于走完了,接着创建DisposeTask实例并把之前的WorkerRunnable实例关联进去,下一步执行w.schedule(task, delay, unit);

1
2
3
4
5
6
7
8
9
10
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}

return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}

用之前获取到的ThreadWorker实例调用scheduleActual

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}

Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}

return sr;
}

我们可以看到,最终提交到executor线程池中执行,最终才会执行source.subscribe(parent);
subscribeOn(Schedulers.io())分析完,接下来看observeOn(AndroidSchedulers.mainThread())是如何执行的

1
2
3
4
private static final class MainHolder {

static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
1
2
3
4
5
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
1
2
3
4
5
6
7
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

这里创建了一个ObservableObserveOn实例,继续来看subscribeActual(Observer<? super T> observer)

1
2
3
4
5
6
7
8
9
10
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}

scheduler.createWorker();这里返回的是从HandlerScheduler里创建的HandlerWorker(handler),这个handler是与主线程的looper绑定的。

1
2
3
4
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}

接着调用source.subscribesource就是我们调用.subscribeOn(Schedulers.io())返回的ObservableSubscribeOn实例,回到上面讲过的调用流程了。接下来如果执行e.onNext(1);那么就是调用ObserveOnObserver实例中的onNext方法

1
2
3
4
5
6
7
8
9
10
11
@Override
public void onNext(T t) {
if (done) {
return;
}

if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}

执行schedule();

1
2
3
4
5
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}

最终会执行如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
ublic Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");

if (disposed) {
return Disposables.disposed();
}

run = RxJavaPlugins.onSchedule(run);

ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.

handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}

return scheduled;
}

因为我们之前讲过handler是绑定主线程的,然后就会在主线程中执行run() 方法,线程切换就在这里完成,接着执行ObserveOnObserver中的run()方法

1
2
3
4
5
6
7
8
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}

接下来会走drainNormal();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void drainNormal() {
int missed = 1;

final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;

for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}

for (;;) {
boolean d = done;
T v;

try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;

if (checkTerminated(d, empty, a)) {
return;
}

if (empty) {
break;
}

a.onNext(v);
}

missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}

这里就是从队列中取出消息,然后调用onNext方法。
总结:RxJava 的优点在于链式调用,逻辑非常清楚,切换线程也十分方便,也可以随时中断流程。