RxJava<一> rxJava初览

tech2024-10-06  27

首先看下RxJava的入口。由于RxJava 实际上是Observable 的层层构造,中间也设置了Scheduler。下面这个方法是设置Observer的,实际分析RxJava 从此落脚分析是不错的。大家看RxJava 如何高度抽象化,利用Scheduler,调度,层层传递到Observer 的。调度实际主要是线程调度。

public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { //Calls the associated hook function. 实际是调用Hook function的。 observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); //这个接口方法实际上调用的包括在外层的Observalbe 的。一般情况下。通过 Observalbe 的各种Operator 之后生成形形色色的 Observalbe,现在跟进去如何关联Scheduler。此为抽象类。实际上最后的Observable只有subscribeOn 里面形成的 ObservableSubscribeOn,这个是上游的调度。形成的结果会层层调度到 observeOn 形成的ObservableObserveOn 。最后传递到Observe里面去。 等于说各种操作符其实是对应的RxJava的调用逻辑。subscribeOn 虽然传入的是一个Scheduler,比如IoScheduler,但是返回的还是Observable。 observeOn也是如此。IoScheduler 是采用线程池做的调度。observeOn 的Scheduler,比如说是 new HandlerScheduler(new Handler(Looper.getMainLooper()), false) ,采用的主线程的Looper,自然是通过Handler,把Runable,传递到主线程,最后传递给Observe的apply方法。 以上这段只是一部分核心,但是确提供了RxJava的学习思路。结合我上面几篇的Retrofit,Retrofit 最后的落脚点就是生成了两种同步或者异步的Obseravle。比如CallExecuteObservable 和 CallEnqueueObservable。本来Okhttp就是采用线程池技术,Retrofit对他进行了改造了,线程池由外部的Scheduler提供。并且RxJava 还能实现类似Event Bus技术,实在是很牛逼的。 subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
最新回复(0)