RxJava系列文章(三)
RxJava系列文章RxJava系列文章(一) - 网络图片加载水印一般写法RxJava系列文章(二) - 网络图片添加水印RxJava写法RxJava系列文章(三) - 线程调度切换源码分析RxJava系列文章(四) - 普通观察者与RxJava的观察者RxJava系列文章(五)- CopyOnWriteArrayList与ConcurrentLinkedQueueRxJava系列文章(六)- new Handler()与new Handler(Looper.getMainLooper())区别RxJava系列文章(七) - 你是否了解RxJavaRxJava系列文章(八) - RxPermission
1. 概述这篇文章主要记录下 RxJava中线程调度切换源码分析:.subscribeOn(Schedulers.io());.observeOn(AndroidSchedulers.mainThread());
其实所有第三方框架都有共性,都是基于最原始的框架的封装,其本质是不会变的,比如:1>:ButterKnife - findViewById;2>:OkHttp -> socket + okio;3>:RxJava -> 线程池 + handler;
2. 解析子线程切换:subscribeOn(Schedulers.io())这句代码意思就是:它上边所有操作都是在子线程中执行的
final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent=parent; } @Override public void run() { source.subscribe(parent); } }
scheduler.scheduleDirect()调用了scheduler.io() ,io其实就是:IO=RxJavaPlugins.initIoScheduler(new IOTask());-> DEFAULT=new IoScheduler();-> 创建一个线程池的封装对象
@NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { // createWorker() -> IOScheduler 的 createWorker() -> EventLoopWorker final Worker w=createWorker(); final Runnable decoratedRun=RxJavaPlugins.onSchedule(run); DisposeTask task=new DisposeTask(decoratedRun, w); // w.schedule -> EventLoopWorker的 schedule w.schedule(task, delay, unit); return task; }
execute 指的是 schedule;submit 指的是 schedule;
对上游处理是包裹
3. 解析主线程切换:observeOn(AndroidSchedulers.mainThread()) // MainThreadScheduler 策略Scheduler.Worker w=scheduler.createWorker();// 调用上游的 subscribe(),对下游的 observer 进行代理包裹 ObserveOnObserversource.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
对下游的处理:worker.schedule(this);schedule()方法如下:
@Override public Disposable schedule(Runnable run, long delay, TimeUnit unit) { ScheduledRunnable scheduled=new ScheduledRunnable(handler, run); Message message=Message.obtain(handler, scheduled); message.obj=this; // Used as token for batch disposal of this worker's runnables. // 这里只是把消息发送出去,但是消息并没有执行 handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay))); return scheduled; }
作者博客
http://www.cherylgood.cn
前言
基于RxJava2.1.1
我们在前面的 RxJava2.0使用详解(一)初步分析了RxJava从创建到执行的流程。RxJava2.0使用详解(二) 中分析了RxJava的随意终止Reactive流的能力的来源;也明白了RxJava的onComplete;与onError(t);只有一个会被执行的秘密。RxJava2.X 源码分析(三)中探索了RxJava2调用subscribeOn切换被观察者线程的原理。
本次我们将继续探索RxJava2.x切换观察者的原理,分析observeOn与subscribeOn的不同之处。继续实现我们在第一篇中定下的小目标
从Demo到原理
OK,我们的Demo还是上次的demo,忘记了的小伙伴可以点击RxJava2.X 源码分析(三),这里就不再重复了哦,我们直接进入正题。
Ok,按照套路,我们从observeOn方法入手。
Ok,我点~^_^
我们继续往下看,我猜套路跟subscribeOn的逃不多,也是采用装饰者模式,wrapper我们的Observable和Observer产生一个中间被观察者和观察中,通过中间被观察者订阅上游被观察者,通过中间观察者接收上游被观察者下发的数据,然后通过线程切换将数据传递给下游观察者。
Ok,我们来验证下才想。我觉得就是没完全猜对,也能猜对其中的大部分。
Ok,熟悉的RxJavaPlugins.onAssemblyhook处理,略过,直接看new ObservableObserveOn(this, scheduler, delayError, bufferSize)这句
Ok,果然,熟悉的模式,对我们上游的Observable,下游的Observerwrapper一次。
1、ObservableObserveOn继承了AbstractObservableWithUpstream
2、source保存上游的Observable
3、scheduler为本次的调度器
4、在下游调用subscribe订阅时触发->subscribeActual->Wrapper了下游的Observer观察者
3处:source为游Observable,下游Observer被wrapper到ObserveOnObserver,发生订阅数件,上游Observable开始执行subscribeActual,调用ObserveOnObserver的onSubscribe以及onNext、onError、onComplete等
OK,我们接着看Observer被包装进 ObserveOnObserver的样子,代码有点多,我们分段讲解
OK,执行玩这里之后,就到我们的onXX方法了
首先可无限调用的onNext
其次只能触发一次的onError,基本差不多
同样是只能触发一次的onComplete,同样的套路,就不说了
然后就是我们的关键点schedule;
什么?传入了this?那么说明什么呢?( ̄? ̄)
嗯?this是个runnable,没错,我们的ObserveOnObserver实现了Runnable接口
那么,接下来自然是调用run方法
好吧,在看drainNormal前,我们先看一个函数
true:1、订阅被取消cancelled==true,2、done==true onNext刚被调度完,onError或者onCompele被调用
继续看drainNormal
总结
Ok,看到这里我们基本了解了observeOn的实现流程,同样是老套路,使用装饰者模式,中间Wrapper了我们的Observable和Observer,通过中间增加一个Observable和Observer来实现线程的切换。
上一篇:remain的用法
发表评论