Rxjava

原理

揭秘RxJava的实现原理
RxJava是响应式编程,基于观察者模式,事件流将从上往下,从订阅源传递到观察者。
Rx框架的优点,它可以避免回调嵌套,更优雅地切换线程实现异步处理数据。配合一些操作符,可以让处理事件流的代码更加简洁,逻辑更加清晰。

Subscriber(观察者、订阅者) 和 Observable(订阅,被观察者) 显著的;觉察得到的;看得见的
subscribeOn 用于指定被观察者(Observable)执行的线程环境,只能指定一次,多次指定以第一次为准
observeOn 用于指定观察者(Subscriber)执行的线程环境,每次指定完都在下一步生效。
正好相反

每个 Observable 里面有一个 OnSubscribe 对象,只有一个方法(void call(Subscriber<? super T> subscriber);),用来产生数据流,这是典型的命令模式。
Flowable,retrofit返回。

原理
RxJava是响应式编程,基于观察者模式,事件流将从上往下,从订阅源传递到观察者,源码思路就是写个接口(OnSubscribe的call),然后在数据源里调用这个接口的方法。订阅者去监听这个接口的回调。
每调用一次操作符的方法,就相当于在上层数据源和下层观察者之间桥接了一个新的Observable。他去传给下面的订阅者数据
unknown_filename

Scheduler(调度器)
调用的 Executors 的线程池,请求完数据通过handler设置到主线程。
SchedulerPoolFactory

1
2
3
4
5
6
7
8
9
10
  public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
tryPutIntoPool(PURGE_ENABLED, exec);
return exec;
}


public static Scheduler mainThread() { return getInstance().mainThreadScheduler; }


自动取消
AutoDispose
.as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(this)))
在Activity销毁时自己onDispose
不需要Disposable

1
2
3
4
5
6
7
8
9
10
11
12
class MyActivity {

var mDisposable: Disposable? = null

fun onCreate(savedInstanceState: Bundle?) {
var disposable = Observable.subscribeWith(aObserver)
mDisposable?.add(disposable)
}

fun onDestroy() {
mDisposable?.dispose()
}}

基础操作

rxjava

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 /**
     * 处理数组集合
     */
    public static void just() {
//        Observable observable=Observable.just("一二三四五","上山打老虎","老虎一发威","武松就发怵");
        Integer[] items1 = {1, 2, 3, 4, 5, 6};
        Integer[] items2 = {3, 5, 6, 8, 3, 8};
        Observable observable = Observable.just(items1, items2);//俩个数组变成一个了
        observable.subscribe(new Subscriber<Integer[]>() {
            @Override
            public void onCompleted() {
                Log.i(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, e.getMessage());
            }

            @Override
            public void onNext(Integer[] o) {
                for(int i=0;i<o.length;i++){
                    Log.i(TAG,"next:"+o[i]);
                }
            }
        });
    }
  • Subscriber 对 Observer 接口进行了一些扩展, Subscriber 有 onStart()( 事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置 )、可以返回一个Subscription,取消订阅unsubscribe()

  • 创建被观察者create,from(数组),just(不变参数/俩个数组合并)等

  • 创建观察者:new Subscriber (),也可以new Action()(相当于onnext)rxjava2是 Consumer(肯书妹儿,消费者 ),最后的简化

操作符

map和flatmap

map
对集合或数据进行转化;
map只能单一转换,单一只的是只能一对一进行转换,指一个对象可以转化为另一个对象但是不能转换成对象数组(map返回结果集不能直接使用from/just再次进行事件分发,一旦转换成对象数组的话,再处理集合/数组的结果时需要利用for一一遍历取出(看下面例子),而使用RxJava就是为了剔除这样的嵌套结构,使得整体的逻辑性更强。)

flatmap
适用于嵌套结构(list,数组)
可以单一转换也可以一对多/多对多转换,flatmap 要求返回 Observable,因此可以再内部进行from/just的再次事件分发,一一取出单一对象(转换对象的能力不同)

举例说明
一个Student类中只有一个name,而一个Student却有多门课程(Course)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
List<Student> students = new ArrayList<Student>();
students.add...
...

Action1<List<Course>> action1 = new Action1<List<Course>>() {
@Override
public void call(List<Course> courses) {
//遍历courses,输出cuouses的name
for (int i = 0; i < courses.size(); i++){
Log.i(TAG, courses.get(i).getName());
}
}
};
Observable.from(students)
.map(new Func1<Student, List<Course>>() {
@Override
public List<Course> call(Student student) {
//返回coursesList
return student.getCoursesList();
}
})
.subscribe(action1);

使用flatMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
List<Student> students = new ArrayList<Student>();
students.add...
...

Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCoursesList());
}
})
.subscribe(new Action1<Course>() {
@Override
public void call(Course course) {
Log.i(TAG, course.getName());
}
});

其他

merge:合并俩个任务;
filter:对集合或数据进行过滤;
toSortedList() :为事件中的数据排序
interval(1, TimeUnit.SECONDS) 创建一个每隔一秒发送一次事件的对象
timestamp() :为每个事件加上一个时间戳
take:取出集合中的前几个
skip:跳过前几个元素
each:遍历集合

1
2
3
4
5
6
7
8
9
10
11
       Observable.from(integers)
                .filter(new Func1<Integer, Boolean>() {
                    //泛型第一个是传入的参数,第二个是返回的参数
                    @Override
                    public Boolean call(Integer integer) {
                        return integer % 2 != 0;
                    }   
                //取前四个,从1开始
                .take(4)
                //取前四个中的后两个
                .takeLast(2)

concat操作符处理多数据源
concat会三个数据源都会请求的。如何使得哪层有数据就用哪层的,之后就不走后面的逻辑了。 
可以配合first()操作符来实现这样的效果。例如 对缓存进行检查,如:内存缓存、本地缓存、网络,哪一层有数据立即返回。
注意:Observable.concat只接受相同泛型的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable.concat(memorySource, diskSource, networkSource)
                        .first()
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(new Action1<String>() {
                            @Override
                            public void call(String s) {
                                printLog(tvLogs, "Getting data from ", s);
                            }
                        }, new Action1<Throwable>() {
                            @Override
                            public void call(Throwable throwable) {
                                throwable.printStackTrace();
                                printLog(tvLogs, "Error: ", throwable.getMessage());
                            }
                        });

compose
减少一些相同的操作

1
2
3
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer)

zip 操作符
可以将多个 Observable (多个网络请求)的数据结合为一个数据源再发射出去。

flatMap 和 concatMap

  • concatMap是有序的,flatMap是无序的,concatMap最终输出的顺序与原序列保持一致,而flatMap则不一定,有可能出现交错。
  • concatMap和flatMap的功能是一样的, 将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据放进一个单独的Observable。只不过最后合并Observables flatMap采用的merge,而concatMap采用的是连接(concat)。
  • flatMap一般用来链式调用接口,请求完一个接口又请求另一个接口

subscribe和subscribeWith
一般subscribeWith用到使用Rx请求接口的这种情况,订阅后把请求参数返回回去,可以添加到CompositeDisposable中方便绑定Activity生命周期取消
subscribe返回值是void所以在请求接口时,最好还是使用subscribeWith,添加订阅关系更方便了

single没有继续

线程

  • Schedulers.newThread() 代表一个常规的新线程;通过内部的 Handler 把任务放到主线程去做。
  • AndroidSchedulers.mainThread() 代表Android的主线程
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用
  • Schedulers.io());默认线程数等于处理器的数量

比如传统的创建线程:

1
new Thread(new Runnable() { @Override public void run() { // TODO } }).start();

在RxJava中,我们可以这样实现,这样的好处是我们可以直接使用RxJava的线程调度逻辑。还可以在合适的时机终止线程

1
2
3
4
5
6
7
8
Scheduler.Worker worker = Schedulers.io().createWorker();
worker.schedule(new Action0() {
    @Override
    public void call() {
        // TODO
    }
});
worker.unsubscribe();

还有个好处是更容易的实现延迟执行和周期执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 延迟
Schedulers.computation().createWorker().schedule(new Action0() {
    @Override
    public void call() {
        // TODO
    }}, 500, TimeUnit.MILLISECONDS);


// 周期
Schedulers.computation().createWorker().schedulePeriodically(new Action0() {
    @Override
    public void call() {
        // TODO
    }}, 500, 250, TimeUnit.MILLISECONDS);

处理Activity的生命周期

1.在configuration改变(比如转屏)之后继续之前的Subscription。
比如你使用Retrofit发出了一个REST请求,接着想在listview中展示结果。如果在网络请求的时候用户旋转了屏幕怎么办?你当然想继续刚才的请求,但是怎么搞?
2.Observable持有Context导致的内存泄露
这个问题是因为创建subscription的时候,以某种方式持有了context的引用,尤其是当你和view交互的时候,这太容易发生!如果Observable没有及时结束,内存占用就会越来越大。
解决方案
在生命周期的某个时刻取消订阅。一个很常见的模式就是使用 CompositeDisposable 来持有所有的Subscriptions,然后在onDestroy()或者onDestroyView()里取消所有的订阅。mDisposable.dispose();

真实项目运用

链式调用,就不用调完一个回调再调另一个这样子了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
      Subscription subscription = LoginNetManager.getIP()
                .filter(new Func1<RspIqbDefault<IPModel>, Boolean>() {
                    @Override
                    public Boolean call(RspIqbDefault<IPModel> rsp) {
                        return rsp != null && rsp.isSuccess && rsp.getResultEntity() != null;
                    }
                })
                .flatMap(new Func1<RspIqbDefault<IPModel>, Observable<RspIqbDefault<SmsCodeLoginModel>>>() {
                    @Override
                    public Observable<RspIqbDefault<SmsCodeLoginModel>> call(RspIqbDefault<IPModel> rsp) {
                        IPModel entity = rsp.getResultEntity();
                        return VerifySmsNetManager.smsCodeLogin(entity.ip, code, phone_number, sms_id, action);
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<RspIqbDefault<SmsCodeLoginModel>>() {
                    @Override
                    public void onCompleted() {
                    }


                    @Override
                    public void onError(Throwable e) {
                        if (mView != null) {
                            mView.smsCodeLogin(null, -1, null);
                        }
                    }

                    @Override
                    public void onNext(RspIqbDefault<SmsCodeLoginModel> rsp) {
                        if (rsp != null) {
                            if (mView != null) {
                                mView.smsCodeLogin(rsp.getResultEntity(), rsp.getErrorCode(), rsp.getErrorMessage());
                            }
                        } else {
                            if (mView != null) {
                                mView.smsCodeLogin(null, -1, null);
                            }
                        }
                    }
                });
        compositeSubscription.add(subscription);

RxBinding
基于RxJava的对于Android原生组件的绑定,是RxJava风格的,相当于代替了OnClick,Listener这些东西

doOnNext
doOnNext()函数执行的地方跟subcribe()中的onNext()执行的地方没有必然联系。 doOnNext()的执行在onNext()之前,可以对数据进行相关处理。例如网络解析返回来的数据先进行处理。也可以直接代替Subscriber

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
   userInfoEntityObservable.doOnNext(new Action1<UserInfoEntity>() {
                    @Override
                    public void call(UserInfoEntity userInfoEntity) {
                        PublicMessage publicMsg = new PublicMessage(roomId);
                        EventBus.getDefault().post(event);
                    }
                }).subscribe();//doOnNext后面可以直接subscribe,不需要参数
//输出[1,2,3,4,5,6,7,8,9,10]中第三个和第四个奇数,take(i) 取前i个事件 takeLast(i) 取后i个事件 doOnNext(Action1) 每次观察者中的onNext调用之前调用,输出before onNext(),5before onNext(),7
Observable.from(number
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer % 2 != 0;
                    }
                })
                //取前四个,从1开始
                .take(4)
                //取前四个中的后两个
                .takeLast(2)
                .doOnNext(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        mText.append("before onNext()\n");
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        mText.append("onNext()--->" + integer + "\n");
                    }
                });
    }

PublishSubject

PublishSubject比较容易理解,相对比其他Subject常用,它的Observer只会接收到PublishSubject被订阅之后发送的数据。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
PublishSubject publishSubject = PublishSubject.create();
  publishSubject.onNext("publishSubject1");
  publishSubject.onNext("publishSubject2");
  publishSubject.subscribe(new Observer() {
          @Override
          public void onCompleted() {
          }

          @Override
          public void onError(Throwable e) {
          }

          @Override
          public void onNext(String s) {
              LogUtil.log("publishSubject observer1:"+s);
          }
      });
  publishSubject.onNext("publishSubject3");
  publishSubject.onNext("publishSubject4");

以上代码,Observer只会接收到”behaviorSubject3”、”behaviorSubject4”。

Transformer

将一个 Observable/Flowable/Single/Completable/Maybe 对象转换成另一个 Observable/Flowable/Single/Completable/Maybe 对象,和调用一系列的内联操作符是一模一样的。
举个简单的例子,写一个transformer()方法将一个发射整数的Observable转换为发射字符串的Observable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static <String> ObservableTransformer<Integer, java.lang.String> transformer() {
        return new ObservableTransformer<Integer, java.lang.String>() {
            @Override
            public ObservableSource<java.lang.String> apply(@NonNull Observable<Integer> upstream) {
                return upstream.map(new Function<Integer, java.lang.String>() {
                    @Override
                    public java.lang.String apply(@NonNull Integer integer) throws Exception {
                        return java.lang.String.valueOf(integer);
                    }
                });
            }
        };
    }

Observable.just(123,456)
       .compose(transformer())
       .subscribe(new Consumer<String>() {
              @Override
               public void accept(@io.reactivex.annotations.NonNull String s) throws Exception {
                   System.out.println("s="+s);
                    }
                });

RxJava全局错误处理
在RxJava中rx.pluginsRxJavaPlugins这个类是用来处理发生的错误的,在RxJava的源码中用RxJavaPlugins.getInstance().getErrorHandler().handleError(e);这句来处理错误,这个默认是没有对错误进行任何处理的,我们需要自己来实现这个,
 RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() { @Override public void handleError(Throwable e) { //在这里处理错误 } });


Rxjava
http://peiniwan.github.io/2024/04/60af8caa03c3.html
作者
六月的雨
发布于
2024年4月6日
许可协议