原理
揭秘RxJava的实现原理
RxJava是响应式编程,基于观察者模式,事件流将从上往下,从订阅源传递到观察者。
Rx框架的优点,它可以避免回调嵌套,更优雅地切换线程实现异步处理数据。配合一些操作符,可以让处理事件流的代码更加简洁,逻辑更加清晰。
Subscriber(观察者、订阅者) 和 Observable(订阅源,被观察者) 显著的;觉察得到的;看得见的
subscribeOn 用于指定被观察者(Observable)执行的线程环境,只能指定一次,多次指定以第一次为准
observeOn 用于指定观察者(Subscriber)执行的线程环境,每次指定完都在下一步生效。
正好相反
每个 Observable 里面有一个 OnSubscribe 对象,只有一个方法(void call(Subscriber<? super T> subscriber);),用来产生数据流,这是典型的命令模式。
Flowable,retrofit返回。
原理
RxJava是响应式编程,基于观察者模式,事件流将从上往下,从订阅源传递到观察者,源码思路就是写个接口(OnSubscribe的call),然后在数据源里调用这个接口的方法。订阅者去监听这个接口的回调。
每调用一次操作符的方法,就相当于在上层数据源和下层观察者之间桥接了一个新的Observable。他去传给下面的订阅者数据

Scheduler(调度器)
调用的 Executors 的线程池,请求完数据通过handler设置到主线程。
SchedulerPoolFactory
1 2 3 4 5 6 7 8 9 10
   |   public static ScheduledExecutorService create(ThreadFactory factory) {         final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);         tryPutIntoPool(PURGE_ENABLED, exec);         return exec;     }
 
  public static Scheduler mainThread() { return getInstance().mainThreadScheduler; }
 
 
 
  | 
 
自动取消
AutoDispose
.as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(this)))
在Activity销毁时自己onDispose
不需要Disposable
1 2 3 4 5 6 7 8 9 10 11 12
   | class MyActivity {        var mDisposable: Disposable? = null        fun onCreate(savedInstanceState: Bundle?) {       var disposable = Observable.subscribeWith(aObserver)       mDisposable?.add(disposable)   }
    fun onDestroy() {       mDisposable?.dispose()   }}
 
  | 
 
基础操作

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
   |  
 
      public static void just() {
          Integer[] items1 = {1, 2, 3, 4, 5, 6};         Integer[] items2 = {3, 5, 6, 8, 3, 8};         Observable observable = Observable.just(items1, items2);         observable.subscribe(new Subscriber<Integer[]>() {             @Override             public void onCompleted() {                 Log.i(TAG, "onCompleted");             }
              @Override             public void onError(Throwable e) {                 Log.i(TAG, e.getMessage());             }
              @Override             public void onNext(Integer[] o) {                 for(int i=0;i<o.length;i++){                     Log.i(TAG,"next:"+o[i]);                 }             }         });     }
 
  | 
 
Subscriber 对 Observer 接口进行了一些扩展, Subscriber 有 onStart()( 事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置 )、可以返回一个Subscription,取消订阅unsubscribe()
 
创建被观察者create,from(数组),just(不变参数/俩个数组合并)等
 
创建观察者:new Subscriber (),也可以new Action()(相当于onnext)rxjava2是 Consumer(肯书妹儿,消费者 ),最后的简化
 
操作符
map和flatmap
map
对集合或数据进行转化;
map只能单一转换,单一只的是只能一对一进行转换,指一个对象可以转化为另一个对象但是不能转换成对象数组(map返回结果集不能直接使用from/just再次进行事件分发,一旦转换成对象数组的话,再处理集合/数组的结果时需要利用for一一遍历取出(看下面例子),而使用RxJava就是为了剔除这样的嵌套结构,使得整体的逻辑性更强。)
flatmap
适用于嵌套结构(list,数组)
可以单一转换也可以一对多/多对多转换,flatmap 要求返回 Observable,因此可以再内部进行from/just的再次事件分发,一一取出单一对象(转换对象的能力不同)
举例说明
一个Student类中只有一个name,而一个Student却有多门课程(Course)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
   | List<Student> students = new ArrayList<Student>(); students.add... ...      Action1<List<Course>> action1 = new Action1<List<Course>>() {     @Override     public void call(List<Course> courses) {                   for (int i = 0; i < courses.size(); i++){             Log.i(TAG, courses.get(i).getName());         }     } }; Observable.from(students)         .map(new Func1<Student, List<Course>>() {             @Override             public List<Course> call(Student student) {                                  return student.getCoursesList();             }         })         .subscribe(action1);
 
  | 
 
使用flatMap
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
   | List<Student> students = new ArrayList<Student>(); students.add... ...      Observable.from(students)         .flatMap(new Func1<Student, Observable<Course>>() {             @Override             public Observable<Course> call(Student student) {                 return Observable.from(student.getCoursesList());             }         })         .subscribe(new Action1<Course>() {             @Override             public void call(Course course) {                 Log.i(TAG, course.getName());             }         });     
 
  | 
 
其他
merge:合并俩个任务;
filter:对集合或数据进行过滤;
toSortedList() :为事件中的数据排序
interval(1, TimeUnit.SECONDS) 创建一个每隔一秒发送一次事件的对象
timestamp() :为每个事件加上一个时间戳
take:取出集合中的前几个
skip:跳过前几个元素
each:遍历集合
1 2 3 4 5 6 7 8 9 10 11
   |        Observable.from(integers)                     .filter(new Func1<Integer, Boolean>() {                                                  @Override                         public Boolean call(Integer integer) {                             return integer % 2 != 0;                         }                                             .take(4)                                          .takeLast(2)
 
  | 
 
concat操作符处理多数据源
concat会三个数据源都会请求的。如何使得哪层有数据就用哪层的,之后就不走后面的逻辑了。 
可以配合first()操作符来实现这样的效果。例如 对缓存进行检查,如:内存缓存、本地缓存、网络,哪一层有数据立即返回。
注意:Observable.concat只接受相同泛型的参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
   | Observable.concat(memorySource, diskSource, networkSource)                         .first()                         .observeOn(AndroidSchedulers.mainThread())                         .subscribe(new Action1<String>() {                             @Override                             public void call(String s) {                                 printLog(tvLogs, "Getting data from ", s);                             }                         }, new Action1<Throwable>() {                             @Override                             public void call(Throwable throwable) {                                 throwable.printStackTrace();                                 printLog(tvLogs, "Error: ", throwable.getMessage());                             }                         });
 
  | 
 
compose
减少一些相同的操作
1 2 3
   | .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(consumer)
 
  | 
 
zip 操作符
可以将多个 Observable (多个网络请求)的数据结合为一个数据源再发射出去。
flatMap 和 concatMap
- concatMap是有序的,flatMap是无序的,concatMap最终输出的顺序与原序列保持一致,而flatMap则不一定,有可能出现交错。
 
- concatMap和flatMap的功能是一样的, 将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据放进一个单独的Observable。只不过最后合并Observables flatMap采用的merge,而concatMap采用的是连接(concat)。
 
- flatMap一般用来链式调用接口,请求完一个接口又请求另一个接口
 
subscribe和subscribeWith
一般subscribeWith用到使用Rx请求接口的这种情况,订阅后把请求参数返回回去,可以添加到CompositeDisposable中方便绑定Activity生命周期取消
subscribe返回值是void所以在请求接口时,最好还是使用subscribeWith,添加订阅关系更方便了
single没有继续
线程
- Schedulers.newThread() 代表一个常规的新线程;通过内部的 Handler 把任务放到主线程去做。
 
- AndroidSchedulers.mainThread() 代表Android的主线程
 
- Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
 
- Schedulers.computation(): 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用
 
- Schedulers.io());默认线程数等于处理器的数量
 
比如传统的创建线程:
1
   | new Thread(new Runnable() { @Override public void run() { 
 
  | 
 
在RxJava中,我们可以这样实现,这样的好处是我们可以直接使用RxJava的线程调度逻辑。还可以在合适的时机终止线程
1 2 3 4 5 6 7 8
   | Scheduler.Worker worker = Schedulers.io().createWorker(); worker.schedule(new Action0() {     @Override     public void call() {              } }); worker.unsubscribe();
 
  | 
 
还有个好处是更容易的实现延迟执行和周期执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
   |  Schedulers.computation().createWorker().schedule(new Action0() {     @Override     public void call() {              }}, 500, TimeUnit.MILLISECONDS);
 
 
  Schedulers.computation().createWorker().schedulePeriodically(new Action0() {     @Override     public void call() {              }}, 500, 250, TimeUnit.MILLISECONDS);
 
  | 
 
处理Activity的生命周期
1.在configuration改变(比如转屏)之后继续之前的Subscription。
比如你使用Retrofit发出了一个REST请求,接着想在listview中展示结果。如果在网络请求的时候用户旋转了屏幕怎么办?你当然想继续刚才的请求,但是怎么搞?
2.Observable持有Context导致的内存泄露
这个问题是因为创建subscription的时候,以某种方式持有了context的引用,尤其是当你和view交互的时候,这太容易发生!如果Observable没有及时结束,内存占用就会越来越大。
解决方案
在生命周期的某个时刻取消订阅。一个很常见的模式就是使用 CompositeDisposable 来持有所有的Subscriptions,然后在onDestroy()或者onDestroyView()里取消所有的订阅。mDisposable.dispose();
真实项目运用
链式调用,就不用调完一个回调再调另一个这样子了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
   |       Subscription subscription = LoginNetManager.getIP()                     .filter(new Func1<RspIqbDefault<IPModel>, Boolean>() {                         @Override                         public Boolean call(RspIqbDefault<IPModel> rsp) {                             return rsp != null && rsp.isSuccess && rsp.getResultEntity() != null;                         }                     })                     .flatMap(new Func1<RspIqbDefault<IPModel>, Observable<RspIqbDefault<SmsCodeLoginModel>>>() {                         @Override                         public Observable<RspIqbDefault<SmsCodeLoginModel>> call(RspIqbDefault<IPModel> rsp) {                             IPModel entity = rsp.getResultEntity();                             return VerifySmsNetManager.smsCodeLogin(entity.ip, code, phone_number, sms_id, action);                         }                     })                     .observeOn(AndroidSchedulers.mainThread())                     .subscribe(new Subscriber<RspIqbDefault<SmsCodeLoginModel>>() {                         @Override                         public void onCompleted() {                         }                                   @Override                         public void onError(Throwable e) {                             if (mView != null) {                                 mView.smsCodeLogin(null, -1, null);                             }                         }                              @Override                         public void onNext(RspIqbDefault<SmsCodeLoginModel> rsp) {                             if (rsp != null) {                                 if (mView != null) {                                     mView.smsCodeLogin(rsp.getResultEntity(), rsp.getErrorCode(), rsp.getErrorMessage());                                 }                             } else {                                 if (mView != null) {                                     mView.smsCodeLogin(null, -1, null);                                 }                             }                         }                     });             compositeSubscription.add(subscription);     
 
  | 
 
RxBinding
基于RxJava的对于Android原生组件的绑定,是RxJava风格的,相当于代替了OnClick,Listener这些东西
doOnNext
doOnNext()函数执行的地方跟subcribe()中的onNext()执行的地方没有必然联系。 doOnNext()的执行在onNext()之前,可以对数据进行相关处理。例如网络解析返回来的数据先进行处理。也可以直接代替Subscriber
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
   |    userInfoEntityObservable.doOnNext(new Action1<UserInfoEntity>() {                     @Override                     public void call(UserInfoEntity userInfoEntity) {                         PublicMessage publicMsg = new PublicMessage(roomId);                         EventBus.getDefault().post(event);                     }                 }).subscribe();//doOnNext后面可以直接subscribe,不需要参数 //输出[1,2,3,4,5,6,7,8,9,10]中第三个和第四个奇数,take(i) 取前i个事件 takeLast(i) 取后i个事件 doOnNext(Action1) 每次观察者中的onNext调用之前调用,输出before onNext(),5,before onNext(),7 Observable.from(number                 .filter(new Func1<Integer, Boolean>() {                     @Override                     public Boolean call(Integer integer) {                         return integer % 2 != 0;                     }                 })                 //取前四个,从1开始                 .take(4)                 //取前四个中的后两个                 .takeLast(2)                 .doOnNext(new Action1<Integer>() {                     @Override                     public void call(Integer integer) {                         mText.append("before onNext()\n");                     }                 })                 .subscribe(new Action1<Integer>() {                     @Override                     public void call(Integer integer) {                         mText.append("onNext()--->" + integer + "\n");                     }                 });     }
 
  | 
 
PublishSubject
PublishSubject比较容易理解,相对比其他Subject常用,它的Observer只会接收到PublishSubject被订阅之后发送的数据。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
   | PublishSubject publishSubject = PublishSubject.create();   publishSubject.onNext("publishSubject1");   publishSubject.onNext("publishSubject2");   publishSubject.subscribe(new Observer() {           @Override           public void onCompleted() {           }
            @Override           public void onError(Throwable e) {           }
            @Override           public void onNext(String s) {               LogUtil.log("publishSubject observer1:"+s);           }       });   publishSubject.onNext("publishSubject3");   publishSubject.onNext("publishSubject4");
 
  | 
 
以上代码,Observer只会接收到”behaviorSubject3”、”behaviorSubject4”。
将一个 Observable/Flowable/Single/Completable/Maybe 对象转换成另一个 Observable/Flowable/Single/Completable/Maybe 对象,和调用一系列的内联操作符是一模一样的。
举个简单的例子,写一个transformer()方法将一个发射整数的Observable转换为发射字符串的Observable。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
   | public static <String> ObservableTransformer<Integer, java.lang.String> transformer() {         return new ObservableTransformer<Integer, java.lang.String>() {             @Override             public ObservableSource<java.lang.String> apply(@NonNull Observable<Integer> upstream) {                 return upstream.map(new Function<Integer, java.lang.String>() {                     @Override                     public java.lang.String apply(@NonNull Integer integer) throws Exception {                         return java.lang.String.valueOf(integer);                     }                 });             }         };     }
  Observable.just(123,456)        .compose(transformer())        .subscribe(new Consumer<String>() {               @Override                public void accept(@io.reactivex.annotations.NonNull String s) throws Exception {                    System.out.println("s="+s);                     }                 });
 
  | 
 
RxJava全局错误处理
在RxJava中rx.pluginsRxJavaPlugins这个类是用来处理发生的错误的,在RxJava的源码中用RxJavaPlugins.getInstance().getErrorHandler().handleError(e);这句来处理错误,这个默认是没有对错误进行任何处理的,我们需要自己来实现这个,
 RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() { @Override public void handleError(Throwable e) { //在这里处理错误 } });