原理
揭秘RxJava的实现原理
RxJava是响应式编程,基于观察者模式,事件流将从上往下,从订阅源传递到观察者。
Rx框架的优点,它可以避免回调嵌套,更优雅地切换线程实现异步处理数据。配合一些操作符,可以让处理事件流的代码更加简洁,逻辑更加清晰。
Subscriber(观察者、订阅者) 和 Observable(订阅源,被观察者) 显著的;觉察得到的;看得见的
subscribeOn 用于指定被观察者(Observable)执行的线程环境,只能指定一次,多次指定以第一次为准
observeOn 用于指定观察者(Subscriber)执行的线程环境,每次指定完都在下一步生效。
正好相反
每个 Observable 里面有一个 OnSubscribe 对象,只有一个方法(void call(Subscriber<? super T> subscriber);),用来产生数据流,这是典型的命令模式。
Flowable,retrofit返回。
原理
RxJava是响应式编程,基于观察者模式,事件流将从上往下,从订阅源传递到观察者,源码思路就是写个接口(OnSubscribe的call),然后在数据源里调用这个接口的方法。订阅者去监听这个接口的回调。
每调用一次操作符的方法,就相当于在上层数据源和下层观察者之间桥接了一个新的Observable。他去传给下面的订阅者数据
Scheduler(调度器)
调用的 Executors 的线程池,请求完数据通过handler设置到主线程。
SchedulerPoolFactory
1 2 3 4 5 6 7 8 9 10
| public static ScheduledExecutorService create(ThreadFactory factory) { final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory); tryPutIntoPool(PURGE_ENABLED, exec); return exec; }
public static Scheduler mainThread() { return getInstance().mainThreadScheduler; }
|
自动取消
AutoDispose
.as
(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(this)))
在Activity销毁时自己onDispose
不需要Disposable
1 2 3 4 5 6 7 8 9 10 11 12
| class MyActivity { var mDisposable: Disposable? = null fun onCreate(savedInstanceState: Bundle?) { var disposable = Observable.subscribeWith(aObserver) mDisposable?.add(disposable) }
fun onDestroy() { mDisposable?.dispose() }}
|
基础操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
public static void just() {
Integer[] items1 = {1, 2, 3, 4, 5, 6}; Integer[] items2 = {3, 5, 6, 8, 3, 8}; Observable observable = Observable.just(items1, items2); observable.subscribe(new Subscriber<Integer[]>() { @Override public void onCompleted() { Log.i(TAG, "onCompleted"); }
@Override public void onError(Throwable e) { Log.i(TAG, e.getMessage()); }
@Override public void onNext(Integer[] o) { for(int i=0;i<o.length;i++){ Log.i(TAG,"next:"+o[i]); } } }); }
|
Subscriber 对 Observer 接口进行了一些扩展, Subscriber 有 onStart()( 事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置 )、可以返回一个Subscription,取消订阅unsubscribe()
创建被观察者create,from(数组),just(不变参数/俩个数组合并)等
创建观察者:new Subscriber (),也可以new Action()(相当于onnext)rxjava2是 Consumer(肯书妹儿,消费者 ),最后的简化
操作符
map和flatmap
map
对集合或数据进行转化;
map只能单一转换,单一只的是只能一对一进行转换,指一个对象可以转化为另一个对象但是不能转换成对象数组(map返回结果集不能直接使用from/just再次进行事件分发,一旦转换成对象数组的话,再处理集合/数组的结果时需要利用for一一遍历取出(看下面例子),而使用RxJava就是为了剔除这样的嵌套结构,使得整体的逻辑性更强。)
flatmap
适用于嵌套结构(list,数组)
可以单一转换也可以一对多/多对多转换,flatmap 要求返回 Observable,因此可以再内部进行from/just的再次事件分发,一一取出单一对象(转换对象的能力不同)
举例说明
一个Student类中只有一个name,而一个Student却有多门课程(Course)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| List<Student> students = new ArrayList<Student>(); students.add... ... Action1<List<Course>> action1 = new Action1<List<Course>>() { @Override public void call(List<Course> courses) { for (int i = 0; i < courses.size(); i++){ Log.i(TAG, courses.get(i).getName()); } } }; Observable.from(students) .map(new Func1<Student, List<Course>>() { @Override public List<Course> call(Student student) { return student.getCoursesList(); } }) .subscribe(action1);
|
使用flatMap
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| List<Student> students = new ArrayList<Student>(); students.add... ... Observable.from(students) .flatMap(new Func1<Student, Observable<Course>>() { @Override public Observable<Course> call(Student student) { return Observable.from(student.getCoursesList()); } }) .subscribe(new Action1<Course>() { @Override public void call(Course course) { Log.i(TAG, course.getName()); } });
|
其他
merge:合并俩个任务;
filter:对集合或数据进行过滤;
toSortedList() :为事件中的数据排序
interval(1, TimeUnit.SECONDS) 创建一个每隔一秒发送一次事件的对象
timestamp() :为每个事件加上一个时间戳
take:取出集合中的前几个
skip:跳过前几个元素
each:遍历集合
1 2 3 4 5 6 7 8 9 10 11
| Observable.from(integers) .filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer % 2 != 0; } .take(4) .takeLast(2)
|
concat操作符处理多数据源
concat会三个数据源都会请求的。如何使得哪层有数据就用哪层的,之后就不走后面的逻辑了。
可以配合first()操作符来实现这样的效果。例如 对缓存进行检查,如:内存缓存、本地缓存、网络,哪一层有数据立即返回。
注意:Observable.concat只接受相同泛型的参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| Observable.concat(memorySource, diskSource, networkSource) .first() .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<String>() { @Override public void call(String s) { printLog(tvLogs, "Getting data from ", s); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { throwable.printStackTrace(); printLog(tvLogs, "Error: ", throwable.getMessage()); } });
|
compose
减少一些相同的操作
1 2 3
| .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(consumer)
|
zip 操作符
可以将多个 Observable (多个网络请求)的数据结合为一个数据源再发射出去。
flatMap 和 concatMap
- concatMap是有序的,flatMap是无序的,concatMap最终输出的顺序与原序列保持一致,而flatMap则不一定,有可能出现交错。
- concatMap和flatMap的功能是一样的, 将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据放进一个单独的Observable。只不过最后合并Observables flatMap采用的merge,而concatMap采用的是连接(concat)。
- flatMap一般用来链式调用接口,请求完一个接口又请求另一个接口
subscribe和subscribeWith
一般subscribeWith用到使用Rx请求接口的这种情况,订阅后把请求参数返回回去,可以添加到CompositeDisposable中方便绑定Activity生命周期取消
subscribe返回值是void所以在请求接口时,最好还是使用subscribeWith,添加订阅关系更方便了
single没有继续
线程
- Schedulers.newThread() 代表一个常规的新线程;通过内部的 Handler 把任务放到主线程去做。
- AndroidSchedulers.mainThread() 代表Android的主线程
- Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
- Schedulers.computation(): 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用
- Schedulers.io());默认线程数等于处理器的数量
比如传统的创建线程:
1
| new Thread(new Runnable() { @Override public void run() {
|
在RxJava中,我们可以这样实现,这样的好处是我们可以直接使用RxJava的线程调度逻辑。还可以在合适的时机终止线程
1 2 3 4 5 6 7 8
| Scheduler.Worker worker = Schedulers.io().createWorker(); worker.schedule(new Action0() { @Override public void call() { } }); worker.unsubscribe();
|
还有个好处是更容易的实现延迟执行和周期执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| Schedulers.computation().createWorker().schedule(new Action0() { @Override public void call() { }}, 500, TimeUnit.MILLISECONDS);
Schedulers.computation().createWorker().schedulePeriodically(new Action0() { @Override public void call() { }}, 500, 250, TimeUnit.MILLISECONDS);
|
处理Activity的生命周期
1.在configuration改变(比如转屏)之后继续之前的Subscription。
比如你使用Retrofit发出了一个REST请求,接着想在listview中展示结果。如果在网络请求的时候用户旋转了屏幕怎么办?你当然想继续刚才的请求,但是怎么搞?
2.Observable持有Context导致的内存泄露
这个问题是因为创建subscription的时候,以某种方式持有了context的引用,尤其是当你和view交互的时候,这太容易发生!如果Observable没有及时结束,内存占用就会越来越大。
解决方案
在生命周期的某个时刻取消订阅。一个很常见的模式就是使用 CompositeDisposable 来持有所有的Subscriptions,然后在onDestroy()或者onDestroyView()里取消所有的订阅。mDisposable.dispose();
真实项目运用
链式调用,就不用调完一个回调再调另一个这样子了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| Subscription subscription = LoginNetManager.getIP() .filter(new Func1<RspIqbDefault<IPModel>, Boolean>() { @Override public Boolean call(RspIqbDefault<IPModel> rsp) { return rsp != null && rsp.isSuccess && rsp.getResultEntity() != null; } }) .flatMap(new Func1<RspIqbDefault<IPModel>, Observable<RspIqbDefault<SmsCodeLoginModel>>>() { @Override public Observable<RspIqbDefault<SmsCodeLoginModel>> call(RspIqbDefault<IPModel> rsp) { IPModel entity = rsp.getResultEntity(); return VerifySmsNetManager.smsCodeLogin(entity.ip, code, phone_number, sms_id, action); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<RspIqbDefault<SmsCodeLoginModel>>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { if (mView != null) { mView.smsCodeLogin(null, -1, null); } } @Override public void onNext(RspIqbDefault<SmsCodeLoginModel> rsp) { if (rsp != null) { if (mView != null) { mView.smsCodeLogin(rsp.getResultEntity(), rsp.getErrorCode(), rsp.getErrorMessage()); } } else { if (mView != null) { mView.smsCodeLogin(null, -1, null); } } } }); compositeSubscription.add(subscription);
|
RxBinding
基于RxJava的对于Android原生组件的绑定,是RxJava风格的,相当于代替了OnClick,Listener这些东西
doOnNext
doOnNext()函数执行的地方跟subcribe()中的onNext()执行的地方没有必然联系。 doOnNext()的执行在onNext()之前,可以对数据进行相关处理。例如网络解析返回来的数据先进行处理。也可以直接代替Subscriber
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| userInfoEntityObservable.doOnNext(new Action1<UserInfoEntity>() { @Override public void call(UserInfoEntity userInfoEntity) { PublicMessage publicMsg = new PublicMessage(roomId); EventBus.getDefault().post(event); } }).subscribe();//doOnNext后面可以直接subscribe,不需要参数 //输出[1,2,3,4,5,6,7,8,9,10]中第三个和第四个奇数,take(i) 取前i个事件 takeLast(i) 取后i个事件 doOnNext(Action1) 每次观察者中的onNext调用之前调用,输出before onNext(),5,before onNext(),7 Observable.from(number .filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer % 2 != 0; } }) //取前四个,从1开始 .take(4) //取前四个中的后两个 .takeLast(2) .doOnNext(new Action1<Integer>() { @Override public void call(Integer integer) { mText.append("before onNext()\n"); } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { mText.append("onNext()--->" + integer + "\n"); } }); }
|
PublishSubject
PublishSubject比较容易理解,相对比其他Subject常用,它的Observer只会接收到PublishSubject被订阅之后发送的数据。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| PublishSubject publishSubject = PublishSubject.create(); publishSubject.onNext("publishSubject1"); publishSubject.onNext("publishSubject2"); publishSubject.subscribe(new Observer() { @Override public void onCompleted() { }
@Override public void onError(Throwable e) { }
@Override public void onNext(String s) { LogUtil.log("publishSubject observer1:"+s); } }); publishSubject.onNext("publishSubject3"); publishSubject.onNext("publishSubject4");
|
以上代码,Observer只会接收到”behaviorSubject3”、”behaviorSubject4”。
将一个 Observable/Flowable/Single/Completable/Maybe 对象转换成另一个 Observable/Flowable/Single/Completable/Maybe 对象,和调用一系列的内联操作符是一模一样的。
举个简单的例子,写一个transformer()方法将一个发射整数的Observable转换为发射字符串的Observable。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public static <String> ObservableTransformer<Integer, java.lang.String> transformer() { return new ObservableTransformer<Integer, java.lang.String>() { @Override public ObservableSource<java.lang.String> apply(@NonNull Observable<Integer> upstream) { return upstream.map(new Function<Integer, java.lang.String>() { @Override public java.lang.String apply(@NonNull Integer integer) throws Exception { return java.lang.String.valueOf(integer); } }); } }; }
Observable.just(123,456) .compose(transformer()) .subscribe(new Consumer<String>() { @Override public void accept(@io.reactivex.annotations.NonNull String s) throws Exception { System.out.println("s="+s); } });
|
RxJava全局错误处理
在RxJava中rx.pluginsRxJavaPlugins这个类是用来处理发生的错误的,在RxJava的源码中用RxJavaPlugins.getInstance().getErrorHandler().handleError(e);这句来处理错误,这个默认是没有对错误进行任何处理的,我们需要自己来实现这个,
RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() { @Override public void handleError(Throwable e) { //在这里处理错误 } });