流行框架,流处理,逻辑清晰,网络处理方便,不再使用Handler,异步操作,简洁,哪怕是逻辑上面复杂,代码程度上面也能够做到简洁,与Retrofit2.0完美结合,多操作符。
1 | compile 'io.reactivex:rxjava:1.1.6' |
RXJava的使用
基本概念
RxJava基于的是观察者模式,类似订阅-发布,也类似Android的View的一些Listener监听
主要是有下面三个概念
- Observable 被观察者
- Observer 观察者,是一个接口
- Subscriber 观察者,继承自Observer,一般使用这个
- Subject 即时观察者也是被观察者,适合公共资源共享传递,比如做RxBus时候。
观察者与被观察之间是通过subscribe()方法联系起来。与一般的逻辑相反的是,这里是Observable.subscribe(Observer/Subscriber)
简单使用
- 建Observable对象
1 | Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { |
2 创建Subscriber
1 | Subscriber<String> subscriber = new Subscriber<String>() { |
3 两者关联起来
1 | observable.subscribe(subscriber); |
4 运行结果
1 | hello RxJava |
详细
Observable,就是一个需要被观察的对象,序列,生成的方式
首先,.Observable是什么东西已经知道了,在RxJava里面,他是最基本的类,1W多行ORZ,创建Observable的方式有很多种
create
1 | Observable.create(OnSubscribe) |
OnSubcribe是一个了是Listener之类的东西,暂时不管,只是知道他有一个call方法,可以通过该call方法回调Subscriber的onNext,onComplete,onError方法。
just
他可以接收1~9个参数,例子如下
1 | Observable.just("A","B","C").subscribe(new Subscriber<String>() { |
他会依次调用Subscriber的onNext,onNext,onComplete/onError,这里会打印
1 | A |
他的OnSubscribe类型是:
1 | OnSubscribeFromArray-->数组 |
from
接收一个Iterable对象(集合List)或者是数组对象,或者是一个线程的Future,例子如下
1 | Observable.from(new String[]{"A","B","C","D"}).subscribe(new Action1<String>() { |
他的 OnSubscribe类型是:
1 | OnSubscribeFromIterable -->Iterable |
从这里可以看出,just是最终的实现是调用了from操作符。
Defer
直到有订阅者倍subscribe的时候才会创建
1 | str = "Hello,RxJava"; |
输出的结果是:
1 | hi,RxJava |
当你使用别的操作符去生成observable的时候发现会是“Hello,RxJava”
他的OnSubscribe类型是
1 | OnSubscribeDefer --> 泛型 |
Interval
固定时间间隔发送整数序列的Observable,类似一个计数器,例子
1 | Observable.interval(1, TimeUnit.SECONDS,Schedulers.io()).subscribe(new Action1<Long>() { |
需要注意的是:在Android,在UI线程中去使用,需要制定interval的调度器
参数说明
1 | //第一个参数是延时多久开始,第二个参数是每隔多久计时,第三个参数是计时类型,比如TimeUnit .SECONDS,TimeUnit.HOURS等,第四个是调度器,其他重载的方法类似 |
range
创建一个特定序列的Observable,例子
1 | Observable.range(10, 5).subscribe(new Action1<Integer>() { |
当然你也可以知道调度器。其中,第一个是start,第二个参数是count,比如例子的意思就是从10开始,生产5个连续的int,就是10,11,12,13,14
1 | OnSubscribeRange --> start ,end |
repeat
重复发送使用一个Observable,例子
1 | Observable.range(10, 2).repeat(4).subscribe(new Action1<Integer>() { |
他默认是在一个新线程去运行的,需要注意的是repeat不是一个static方法,他是操作在一个Observable之上的。
timer
用于延迟多久之后开始某动作,例子
1 | Observable.timer(2,TimeUnit.SECONDS, Schedulers.newThread()).subscribe(new Action1<Long>() { |
empty ,never,error
用来处理特殊情况的Observable
1 | Observable.empty(); //用来发送不发射任何数据的但是正常结束的Observable |
他的使用就是你需要返回一个Observable对象,但是这个对象又不能为空,又不能去处理事情的时候就可以使用。
变换操作符
map
转换对象的时候使用,就是把一个Observable装换为另外一个Observable,例子
1 | Observable.just("A").map(new Func1<String, Integer>() { |
从这个例子我们就可以看出,这里是把一个关于String的Observable变换为了一个关于Integer的Observable,最后让被观察者得到。
map适合的范围是1对1的装换,不适合一对多的装换。
flatMap
一对多的转换,他的一版使用场景就是,比如去除for循环(map也行),比如需要把从一个对象去除它包含的List子对象然后迭代等,例子:
1 | 一个对象A中,包含了一个List<B\> ,现在需要取出B |
代码:
1 | Observable.just(test).flatMap(new Func1<Test, Observable<String>>() { |
使用Map是暂时无法做到这一点的,除非是在call内部使用一个for循环去做。
flatMap的图是:
concatMap
一个输出有序的Observable,他的用户跟flatMap差不多,只是,他是会按照顺序输出,flatMap不一定,这种情况是在你需要返回一个Observable
1 | Observable.from(test.list).flatMap(new Func1<String, Observable<String>>() { |
使用flatMap的话并不一定能保证顺序。在测试过程发现假如把Schedulers.from(Executors.newFixedThreadPool(1))替换为Schedulers.io()或者是Schedulers.newThread()等是顺序不会变的,目前还不知道为什么,就是使用自定义的线程池的时候会。
swichMap
同样是跟flatMap很类似,除了有一点,就是在异步环境下,当旧数据订阅没有完成,switchMap取消订阅和停止监视那个数据项产生的的Observable,比如下面
1 | Observable.from(test.list).switchMap(new Func1<String, Observable<String>>() { |
被监视的Observable是否取消决定于他是否已经被观察者得到,假如得到的话观察者是能看到这个值的,但是假如他没有被观察者得到,就会被取消订阅,并且不在被观察。
上面那个假如是写成
1 | Observable.from(test.list).switchMap(new Func1<String, Observable<String>>() { |
就会得到所有的输出。
case
转换操作符,他是强制制定类型的装换,没有map那么灵活,而且,当不能装换的时候,会爆出ClassCastException,例子
1 |
|
groupBy
分组 就是按照一种类型的key分组,例子如下
1 | Observable.range(10,10).groupBy(new Func1<Integer, Integer>() { |
输出是:
1 | 08-31 23:54:55.437 17504-17504/com.example.user.testproject I/RxJavaTest: 1====10 |
再比如
1 | Observable.just("A","A","B","AB","C","D").groupBy(new Func1<String,Boolean>() { |
输出是:
1 | 08-31 23:53:14.357 16194-16194/com.example.user.testproject I/RxJavaTest: true====A |
其中stringGroupedObservable.getKey()是获得分组的依据的表示,之后的那个是值。
Scan
他的作用是连续对数据序列的每一项应用一个func,然后连续发射结果。例子
1 | Observable.range(10,2).scan(new Func2<Integer, Integer, Integer>() { |
结果是:
1 | 09-01 21:40:45.112 22400-22400/com.example.user.testproject I/RxJavaTest: 10Subacriber |
他的第一个值是不参与func2的操作,当然你也可以传递一个init值进入作为默认操作。他的第一个参数是上一次计算的结果传入,第二个参数是被观察的序列值,从2开始,第三个是返回类型,三者是类型一样的。使用一个默认的种子计算的,比如
1 | Observable.range(10, 2).scan("A", new Func2<String, Integer, String>() { |
输出结果
1 | 09-01 21:44:51.572 27312-27312/? I/RxJavaTest: A |
flatMapIterable
他的作用是处理一些复杂的数据,接受一个Observable之后,返回一个Iterable,然后,这个Iterable会依次的传递给下面一层或者是Observer,例子
1 | List<String> list = new ArrayList<>(); |
输出
1 | 09-01 22:05:35.582 19199-19199/com.example.user.testproject I/RxJavaTest: 66==flatMapIterable |
buffer
该操作符的作用是先计算一定量的结果,之后再去回调结果给下一个Observable或者是Observer,例子
1 | Observable.just("A","B","C").buffer(2).subscribe(new Action1<List<String>>() { |
结果就是
1 | 09-01 22:10:41.272 24829-24829/com.example.user.testproject I/RxJavaTest: [A, B]==flatMapIterable |
这个方法是按照顺序不重复,当然他还有很多重载的操作,比如
1 | buffer(int count, int skip) // 一次缓存几个,之后每次跳过几个,按照顺序,重复,每次剔除最前面的 |
window
1 |
过滤操作符
filter
符合某种规则的Observable才会向下传递,例子
1 | Observable.range(100,10).filter(new Func1<Integer, Boolean>() { |
上面的Subscriber只会得到大于105的数值
first
只发射第一项,或者是满足条件的第一项,两个例子
1 | Observable.range(100,5).first();// 发射100 |
Debounce
当操作太频繁的时候,在第N个节点发射之前会忽略钱N-1个节点,比如我们的Edittext去seach的时候,可能需要过滤前多少秒的操作只是需要后续的操作。简单点来说,就是,他需要的是在某个时间点之后的结果,在这个时间点之前的结果他会忽略不再发送给下一级。例子:
1 | Observable.create(new Observable.OnSubscribe<Integer>() { |
输出就是6.7.8.9.10
总结就是debounce操作符就是类似一个控制入口,他是有一个门卡的,门卡一般是时间,在这之前的忽略,之后的可以进入
例子是来自 Android RxJava使用介绍(三) RxJava的操作符
其中,还有一个throttleWithTimeout操作符,他的底层也是通过调用debounce操作符完成的,作用类似。但是debounce比她在、多了一个功能就是可以通过函数来进行限流操作。例子如下:
1 | Observable.just(1, 2, 3, 4, 5).debounce(new Func1<Integer, Observable<Integer>>() { |
结果输出是2.4.5。5为什么在其中?因为,在debounce操作符中,最后一个Observable,在被调用onComplete之前的时间间隔内被调用,那么他也会被传递到下一级。
Distinct
只允许没有发射过的数据发射,可以定义过滤规则,例子
1 | Observable.range(1,5).distinct(new Func1<Integer, String>() { |
输出
1 | 09-02 00:32:45.009 4762-4762/? I/RxJavaTest: 1==自定义 |
elementAt
某一位置的元素可以发射,需要需要越界的问题,他是从0开始计算的例子
1 | Observable.range(10,10).elementAt(9).subscribe(new Action1<Integer>() { |
last
最后一个或者满足条件的最后一个,跟first相反,例子
1 | Observable.range(10,5).last(); |
skip
跳过前N项,从第N+1开始发射,例子
1 | Observable.range(10,20).skip(10); |
这个例子是丢弃了1029中的1019,保留20~29,还有两个重载的方法
1 | skip(long,TimeUnit));// 跳过前long时间的Observable,发射之后的 |
结果是14,两个都是
skipLast
不发射后n项数据,跟skip相反,例如
1 | Observable.range(1,10).skipLast(3).subscribe(new Action1<Integer>() { |
输出:1,2,3,4,5,6,7
take
只取前n项数据,例如有10项数据,那么take(4),就是只是圈前4项数据,例如
1 | Observable.range(10,10).take(5).subscribe(new Action1<Integer>() { |
结果是:10,11,12,13,14
taskLast
只发射最后n项数据,比如有10项数据,taskLast(3),则是只发射最后3项数据,例子:
1 | Observable.range(9,5).takeLast(2).subscribe(new Action1<Integer>() { |
输出:12和13
需要注意的是他是一个线程同步的,他可能造成Observable发射延迟,因为他只有过滤掉前面的数据才会轮到后面,然后后面的数据才会发射。
重构的方法有:
1 | takeLast(final int count) //发射后面n个数据,n大于数据全部的时候则发射全部n小于0报异常 |
同时,他还有一个变形,*takeLastBuffer**,takeLast是一个一个数据源返回,但是takeLastBuffer是以List的形式返回,例如:
1 | Observable.range(20,10).takeLastBuffer(5).subscribe(new Action1<List<Integer>>() { |
ignoreElements
忽略所有的元素,只是发射结束事件或者是错误时间,例如
1 | Observable.range(10,10).ignoreElements().subscribe(new Subscriber<Integer>() { |
last
只发射最后的一项数据,忽略前面n项数据,例如
1 | Observable.range(10,10).last().subscribe(new Action1<Integer>() { |
结果是:19
还有一个last(Func1)的重载函数,他可以规定那个才是last,比如
1 | Observable.range(10,10).last(new Func1<Integer, Boolean>() { |
在subscribe里面输出的就是14而不是19,还有一个相似函数是:lastOrDefault(T),他会在Observable没有发射任何数据的时候发射默认值T。
sample操作符
他是在某一段时间内采集过去发射的旧数据再次发射,比如:
1 | Observable.interval(1,TimeUnit.SECONDS).sample(2,TimeUnit.SECONDS).subscribe(new Action1<Long>() { |
interval创造一个Observable之后每个1s发射一个新数据,然后sample会在2s重复的去采集已经发射的旧数据再次发射,输出的结果是:0,2,4,6…
它默认kaiq新线程,我们也可指线程给sample运行
使用
结合操作符
组合操作符的作用是可以结合多个Observable进行操作。
CombineLatest 操作符
他可以组合两个Observable,进行一定的操作之后,再次发射下去,例如:
1 | Observable.combineLatest(Observable.range(5,2), Observable.range(10, 4), new Func2<Integer, Integer, String>() { |
结果是:
1 | 10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==10=combineLatest |
它继续发射的前提是:其中的一个Observable还有数据没有发射,那么,他讲两个Observable目前最新发射的数据组合在一起,比如上面,第一个Observable最新的数据是6,然后第二个的依次在变,然后再把他们组合在一起。
重载方法
1 | combineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) |
他们都是输入一堆的List<Observable
1 | List<Observable<Integer>> list = new ArrayList<>(); |
join组合操作符
他的声明如下:
1 | public final <TRight, TLeftDuration, TRightDuration, R> Observable<R> join(Observable<TRight> right, Func1<T, Observable<TLeftDuration>> leftDurationSelector, |
例子调用:
1 | Observable<String> left = |
例子来源:RxJava 教程第四部分:并发 之意外情况处理
结果输出:
1 | L0== |
可以看到,在right join如left之后,她会结合每一次left的发射的Observable,然后再次发射,但是他的前提是left窗口还有数据在发射。假如left窗口没有数据了,那么right窗口也就不会再去跟left窗口接口再去发射了,比如:
1 | Observable.range(10,10).join(Observable.range(10, 2), new Func1<Integer, Observable<Object>>() { |
将不会有输出。
图:
merge 组合操作符
用于合并多个Observable,他们需要同类型,按照前到后的顺历依次发射所有的Observable例子:
1 | Observable.merge(Observable.range(10,2),Observable.range(20,3)).subscribe(new Action1<Integer>() { |
输出:
1 | 10-19 15:47:39.973 2140-2140/? I/RxJavaTest: 10merge |
图:
zip 组合操作符
用于多两个Observable进行再次操作之后再次发射,他是有顺序的,他会按照顺序去结合多个Observable之间的数据,按照最短的数据为zip的func2的调用次数,例子
1 | Observable.zip(Observable.range(10, 10), Observable.range(5, 2), new Func2<Integer, Integer, String>() { |
输出:
1 | 10-19 15:57:57.482 11123-11123/com.example.user.testproject I/RxJavaTest: 10-5 |
图:
zipWith
跟zip类型,但是他是非静态的,需要在另外一个Observable操作之上,他接受两个参数,一种是一个Observable和Func2,另外一个是多个组合, Iterable和Func2例子:
1 | Observable.range(10,2).zipWith(Observable.range(10,1),new Func2<Integer,Integer,String>(){ |
switchOnNext
将一个发射多个Observable对象装换为一个Observable发射
1 | Observable.switchOnNext(Observable.just(Observable.range(1,1),Observable.range(2,1))).subscribe(new Action1<Integer>() { |
输出:
1 | 10-19 16:13:26.296 24641-24641/? I/RxJavaTest: 1switchOnNext |
图:
startWith
在源Observable之前插入一个或者是多个数据,例如
1 | Observable.range(5,2).startWith(6).subscribe(new Action1<Integer>() { |
结果:
1 | 10-19 16:16:10.690 27055-27055/? I/RxJavaTest: 6switchOnNext |
图:
错误处理操作符
错误处理操作符主要是在Observable的onError中拦截,做一些事情。
catch操作符
1 onErrorReturn 让Observable遇到错误时候发生一个特殊的数据并且正常终止,比如
1 | Observable.just(1, 2, 3, 4, 5).map(new Func1<Integer, String>() { |
输出结果:
1 | 10-26 13:25:46.623 19493-19493/fortunetelling.mmc.oms.rxjavademo I/MainActivityLOG: Result == 1str |
说明:onErrorReturn操作符他会拦截错误,然后在拦截错误之后,不再继续发射源Observable的数据,同时,他将在Func1中返回一个对应的可被观测的数据传递到onNext中,然后调用了onComplete方法完成这一次的Observable过程。
2 onErrorResumeNext 让Observable遇到错误时开始发射第二个Observable数据序列,例子
1 | Observable<String> observable = Observable.just(1, 2, 3, 4, 5).map(new Func1<Integer, String>() { |
结果:
1 | 10-26 13:40:56.843 5987-5987/fortunetelling.mmc.oms.rxjavademo I/MainActivityLOG: onErrorResumeNext == 1str |
可以看到他是终止了继续发射,但是他可以补充发射,就是在遇到错误的时候,可以在源Observable基础上面继续发射多个Observable直到结束。
他的重载函数:
1 | Observable<T> onErrorResumeNext(final Observable<? extends T> resumeSequence) |
使用也是类似,他也是可以在遇到错误之后发射多个Observable。
3 onExceptionResumeNext 让Observable遇到错误时发射继续发射后面的数据项,例子
1 | Observable<String> observable = Observable.just(1, 2, 3, 4, 5).map(new Func1<Integer, String>() { |
输出:
1 | 10-26 13:49:21.803 16041-16041/fortunetelling.mmc.oms.rxjavademo I/MainActivityLOG: onNext1str |
加入不是抛出Error,而是Exception的子类的时候,则结果是:
1 | 10-26 13:50:57.643 17881-17881/fortunetelling.mmc.oms.rxjavademo I/MainActivityLOG: onNext1str |
说明onExceptionResumeNext操作符是在上级源Observable出现的是Exception的子类的时候调用他自己的Observable继续发射下去,同时是不回再去发射源Observable的数据。但是假如上级源Observable出现的不是Exception的子类的时候,那么他会直接调用onError方法结束发射。
对应的图:
onErrorReturn:
onErrorResumeNext:
onExceptionResumeNext:
retry操作符
retry操作符不会把错误传递到Observer的onError当中去。他可以不停的retry,尝试能够让源Observable正常发射结束。
1 retry 操作符,他会在源Observable在发生error的时候不断的重新去调用源数据,重头开始发射,存在了数据可能重复的结果,例如:
1 | Observable<String> observable = Observable.range(10, 3).map(new Func1<Integer, String>() { |
结果是不断的打印
1 | 10retry |
重载函数:
1 | Observable<T> retry(final long count) |
他可以设置重试次数,超出重试次数之后任然发生错误,那么他将会调用onError方法,例子:
1 | Observable<String> observable = Observable.range(10, 3).map(new Func1<Integer, String>() { |
输出:
1 | 10-26 14:22:50.137 4099-4099/com.example.user.testproject I/RxJavaTest: 10retry |
重载函数
1 | retry(Func2<Integer, Throwable, Boolean> predicate) |
他可以设定某种界限,在不超出某个条件之下不断的重试,或者说只要是返回了true都会重试,反之停止,停止之后还是发生了错误,就会调用onError方法,例如
1 | Observable<String> observable = Observable.range(10, 3).map(new Func1<Integer, String>() { |
他的意思是,在重试次数少于3的时候就会重试,否则就不会,输出:
1 | 10-26 14:28:34.882 8577-8577/com.example.user.testproject I/RxJavaTest: 10retry |
图:
2. retryWhen
1 | Observable<String> observable = Observable.range(10, 3).map(new Func1<Integer, String>() { |
输出:
1 | 10-26 15:06:44.358 26227-26227/com.example.user.testproject I/RxJavaTest: 10retry |
说明:这里使用了zipWith限制了retryWhen的次数,当超出了三次之后,zipWith回隐式的调用onComplete方法,所以这里的Observer的onError方法并不会被调用。
例子来源:【译】对RxJava中.repeatWhen()和.retryWhen()操作符的思考
关于:
1 | Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler) |
中的notificationHandler的Observable返回说明:
1 | – 如果返回的 Observable 发射了一个数据,retryWhen 将会执行重试操作 |
出处:RxJava 教程第三部分:驯服数据流之 高级错误处理
也就是说,其实retryWhen的Func1返回的Observable的值并不重要,是他的类型重要,他只是判断是否可以继续执行retryWhen,例如:
1 | Observable<Integer> source = Observable.create(o -> { |
结果:
1 | TimeInterval [intervalInMilliseconds=21, value=1] |
出处:RxJava 教程第三部分:驯服数据流之 高级错误处理
他还有一个重载的操作符可以指定调度器Scheduler
1 | Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) |
辅助操作符
RxJava提供很多的辅助操作符,帮助我们更好的堆Observable进行更加方便的操作。
meterailize/Dematerialize 操作符
meterailize操作符被一个合法有限的Observable调用的时候,如果Observable调用Observer的onNext次数为0或者多次,omComplete次数为1次,或者onError1次。meterailize会将不管是onNext还是onComplete还是onError包装成为Observable发射,而Dematerialize的作用则是相反。meterailize使用的时候他会包装Observable的数据源为一个Notification
1 | Observable.range(10,3).map(new Func1<Integer, Notification<Integer>>() { |
输出就是:
1 | 10-27 16:35:43.962 4606-4606/com.example.user.testproject I/RxJavaTest: materialize[rx.Notification@26788166 OnNext [rx.Notification@133c40b8 OnNext 10]] |
可以看到他是多了一个omCompleted事件的,但是这个传递到onNexu的时候他的值是为null的,所以使用的时候注意。
假如是使用Dematerialize操作符,他是把materialize数据源包装的Notification
1 | Observable.range(10, 3).map(new Func1<Integer, Notification<Integer>>() { |
输出是:
1 | 10-27 16:42:13.563 10276-10276/? I/RxJavaTest: dematerialize = 10 |
还把最后一个空的过滤了。
图片:
delay 操作符
延迟发射Observable。
1 | Observable<T> delay(long delay, TimeUnit unit) |
该方法会延时整一个observable,但是延时结束之后会立马开始发射,之后的数据将不会再延时(同时,以下的其他操作符也是基于这个observable的)例如:
1 | Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { |
输出:
1 | 10-30 16:12:58.535 3044-3044/com.example.user.testproject I/RxJavaTest: 发射 |
需要注意的是:
当第一个发射的就是onError的时候,该延时将不会起效,立马就会发射给Observer,但是假如是发射onComplete的时候还是会延时的。
1 | <U> Observable<T> delay(Func1<? super T, ? extends Observable<U>> itemDelay) |
他可以给给一个Observable给定一个延时,当Func1返回的Observable发射的时候,源数据对应的数据就会发射,比如:
1 | observable.delay(2, TimeUnit.SECONDS); |
结果“
1 | 10-30 16:31:12.929 发射 |
这里,A是延时了1s发射,但是其他的数据是延时了约2s发射。注意的是他不是有叠加的效果,不会是让上一个Observable发射了之后再去延时下一个。
图:
delaySubscription 操作符
他是延迟Observable中OnSubscribe调用call的时间
1 | <U> Observable<T> delaySubscription(Func0<? extends Observable<U>> subscriptionDelay) |
他根据Func0返回的Observable延迟Observable和Observer之间关联,例如
1 | observable.delaySubscription(new Func0<Observable<String>>() { |
输出:
1 | 10-30 16:42:28.510 28979-28979/com.example.user.testproject I/RxJavaTest: 延时订阅开始 |
可以看到延时开始后之后是经过了约2s的时间之后才发生了订阅的关系。
重载函数:
1 | public final Observable<T> delaySubscription(long delay, TimeUnit unit) |
该函数的作用也是类似的。
图:
do系列操作符
doOnEach
1
Observable<T> doOnEach(final Action1<Notification<? super T>> onNotification)
他提供一个Action1的谓词,用于在Observable发射的时候同时也被转换成一个Notification,该Notification中携带了被发射的value,相当于可以多个Observer被调用,例子:
1
2
3
4
5
6
7
8
9
10
11observable.doOnEach(new Action1<Notification<? super String>>() {
@Override
public void call(Notification<? super String> notification) {
Log.i(TAG, "notification = " + notification.getValue());
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "doOnEach = " + s);
}
});输出:
1
2
3
4
5
6
7
810-31 15:31:40.565 2396-2396/com.example.user.testproject I/RxJavaTest: 发射
10-31 15:31:40.565 2396-2396/com.example.user.testproject I/RxJavaTest: notification = A
10-31 15:31:40.565 2396-2396/com.example.user.testproject I/RxJavaTest: doOnEach = A
10-31 15:31:40.565 2396-2396/com.example.user.testproject I/RxJavaTest: notification = B
10-31 15:31:40.565 2396-2396/com.example.user.testproject I/RxJavaTest: doOnEach = B
10-31 15:31:40.565 2396-2396/com.example.user.testproject I/RxJavaTest: notification = C
10-31 15:31:40.565 2396-2396/com.example.user.testproject I/RxJavaTest: doOnEach = C
10-31 15:31:40.565 2396-2396/com.example.user.testproject I/RxJavaTest: notification = null其中最后一个是null代表的onComplete函数被调用。
重载函数:1
Observable<T> doOnEach(Observer<? super T> observer)
他的作用也是类似的,只不过doOnEach参数中的observer的onNext,onComplete,onError也是同样会被调用,例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21observable.doOnEach(new Observer<String>() {
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted" );
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError = " + e.getMessage());
}
@Override
public void onNext(String s) {
Log.i(TAG, "doOnEach = " + s);
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "subscribe = " + s);
}
});输出:
1
2
3
4
5
6
7
810-31 15:34:05.038 4612-4612/com.example.user.testproject I/RxJavaTest: 发射
10-31 15:34:05.038 4612-4612/com.example.user.testproject I/RxJavaTest: doOnEach = A
10-31 15:34:05.038 4612-4612/com.example.user.testproject I/RxJavaTest: subscribe = A
10-31 15:34:05.038 4612-4612/com.example.user.testproject I/RxJavaTest: doOnEach = B
10-31 15:34:05.038 4612-4612/com.example.user.testproject I/RxJavaTest: subscribe = B
10-31 15:34:05.038 4612-4612/com.example.user.testproject I/RxJavaTest: doOnEach = C
10-31 15:34:05.038 4612-4612/com.example.user.testproject I/RxJavaTest: subscribe = C
10-31 15:34:05.038 4612-4612/com.example.user.testproject I/RxJavaTest: onCompleted图:
doOnNext
1
Observable<T> doOnNext(final Action1<? super T> onNext)
他的作用只是会回调被发射源数据的onNext函数而不是全部,例如:
1
2
3
4
5
6
7
8
9
10
11observable.doOnNext(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "doOnNext = " + s);
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "subscribe = " + s);
}
});输出:
1
2
3
4
5
6
710-31 15:36:34.501 6801-6801/com.example.user.testproject I/RxJavaTest: 发射
10-31 15:36:34.501 6801-6801/com.example.user.testproject I/RxJavaTest: doOnNext = A
10-31 15:36:34.501 6801-6801/com.example.user.testproject I/RxJavaTest: subscribe = A
10-31 15:36:34.501 6801-6801/com.example.user.testproject I/RxJavaTest: doOnNext = B
10-31 15:36:34.501 6801-6801/com.example.user.testproject I/RxJavaTest: subscribe = B
10-31 15:36:34.501 6801-6801/com.example.user.testproject I/RxJavaTest: doOnNext = C
10-31 15:36:34.501 6801-6801/com.example.user.testproject I/RxJavaTest: subscribe = C图:
doOnSubscribe操作符
1
Observable<T> doOnSubscribe(final Action0 subscribe)
操作符注册一个动作,当观察者订阅它生成的Observable它就会被调用。例如:
1
2
3
4
5
6
7
8
9
10
11observable.doOnSubscribe(new Action0() {
@Override
public void call() {
Log.i(TAG, "doOnSubscribe被调用" );
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "subscribe = " + s);
}
});输出:
1
2
3
4
510-31 15:39:23.039 9254-9254/com.example.user.testproject I/RxJavaTest: doOnSubscribe被调用
10-31 15:39:23.039 9254-9254/com.example.user.testproject I/RxJavaTest: 发射
10-31 15:39:23.039 9254-9254/com.example.user.testproject I/RxJavaTest: subscribe = A
10-31 15:39:23.039 9254-9254/com.example.user.testproject I/RxJavaTest: subscribe = B
10-31 15:39:23.039 9254-9254/com.example.user.testproject I/RxJavaTest: subscribe = C适合做一些在发射数据之前的操作。图:
doOnUnsubscribe操作符
1
Observable<T> doOnUnsubscribe(final Action0 unsubscribe)
当观察者取消订阅它生成的Observable它就会被调用,比如调用了onComplete函数或者是Observer和Observa之间取消了关联的时候被调用,例如:
1
2
3
4
5
6
7
8
9
10
11observable.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.i(TAG, "doOnUnsubscribe" );
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "subscribe = " + s);
}
});输出:
1
2
3
4
510-31 15:41:26.559 11057-11057/com.example.user.testproject I/RxJavaTest: 发射
10-31 15:41:26.559 11057-11057/com.example.user.testproject I/RxJavaTest: subscribe = A
10-31 15:41:26.559 11057-11057/com.example.user.testproject I/RxJavaTest: subscribe = B
10-31 15:41:26.559 11057-11057/com.example.user.testproject I/RxJavaTest: subscribe = C
10-31 15:41:26.559 11057-11057/com.example.user.testproject I/RxJavaTest: doOnUnsubscribe适合做一些收尾的工作,图:
doOnCompleted和doOnError操作符
1
2Observable<T> doOnCompleted(final Action0 onCompleted)
Observable<T> doOnError(final Action1<Throwable> onError)会在Observa调用对应的函数之前被调用。注意是之前被调用,例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21observable.doOnCompleted(new Action0() {
@Override
public void call() {
Log.i(TAG, "doOnCompleted " );
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "subscribe = " + s);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.i(TAG, "throwable " + throwable.getMessage());
}
}, new Action0() {
@Override
public void call() {
Log.i(TAG, "Completed ");
}
});正常结束的时候结果是:
1
2
3
4
5
610-31 15:47:59.653 16908-16908/com.example.user.testproject I/RxJavaTest: 发射
10-31 15:47:59.653 16908-16908/com.example.user.testproject I/RxJavaTest: subscribe = A
10-31 15:47:59.653 16908-16908/com.example.user.testproject I/RxJavaTest: subscribe = B
10-31 15:47:59.653 16908-16908/com.example.user.testproject I/RxJavaTest: subscribe = C
10-31 15:47:59.653 16908-16908/com.example.user.testproject I/RxJavaTest: doOnCompleted
10-31 15:47:59.653 16908-16908/com.example.user.testproject I/RxJavaTest: Completed把observable最后调用改为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.i(TAG, "发射");
subscriber.onNext("A");
subscriber.onNext("B");
subscriber.onNext("C");
// subscriber.onCompleted();
subscriber.onError(new IllegalArgumentException("error"));
}
});
observable.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.i(TAG, "doOnError = " + throwable.getMessage());
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "subscribe = " + s);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.i(TAG, "throwable " + throwable.getMessage());
}
}, new Action0() {
@Override
public void call() {
Log.i(TAG, "Completed ");
}
});输出是:
1
2
3
4
5
610-31 15:52:01.173 20513-20513/com.example.user.testproject I/RxJavaTest: 发射
10-31 15:52:01.173 20513-20513/com.example.user.testproject I/RxJavaTest: subscribe = A
10-31 15:52:01.173 20513-20513/com.example.user.testproject I/RxJavaTest: subscribe = B
10-31 15:52:01.173 20513-20513/com.example.user.testproject I/RxJavaTest: subscribe = C
10-31 15:52:01.173 20513-20513/com.example.user.testproject I/RxJavaTest: doOnError = error
10-31 15:52:01.173 20513-20513/com.example.user.testproject I/RxJavaTest: throwable error图:
doOnTerminate操作符
在Observable发射终止之前调用,无论是正常还是错误的结束,就是相当于doOnCompleted和doOnError两个的结合体
例子就不写了,图:doAfterTerminate操作符
在Observable终止之后会被调用,也就是在操作符doOnTerminate之后被调用,在doOnUnsubscribe之后被调用
图:finallyDo操作符
该操作符已经被废弃,但是他的作用是当它产生的Observable终止之后会被调用,无论是正常还是异常终止。会在doOnUnsubscribe操作调用之后调用,例如:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16observable.finallyDo(new Action0() {
@Override
public void call() {
Log.i(TAG, "finallyDo ");
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.i(TAG, "doOnUnsubscribe ");
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
}
});输出:
1
2
310-31 15:58:56.166 26661-26661/com.example.user.testproject I/RxJavaTest: 发射
10-31 15:58:56.166 26661-26661/com.example.user.testproject I/RxJavaTest: doOnUnsubscribe
10-31 15:58:56.166 26661-26661/com.example.user.testproject I/RxJavaTest: finallyDo图:
serialize操作符
1 | Observable<T> serialize() |
他的作用主要是维持保证Observable能够正常的结束调用。比如,由于我们的Observable发射数据的时候可能是异步的,那么他们就存在着一种这样子的关系:我前面的数据正常调用onNext函数,然后在某一个异步中调用onComplete函数需要正常结束掉这个Observable,但是还有一些没有完成的异步任务,他们完成之后还会调用onNext函数,但是这回导致Observable的调用不正确,我们需要维持这个Observable正常结束,就可以使用serialize。例如
1 | Observable<String> ob = Observable.create(new Observable.OnSubscribe<String>() { |
这时候的输出是正常的,是:
1 | 11-01 14:01:36.273 8186-8186/? I/RxJavaTest: A |
这是正常的原因是Observer接受到onComplete的时候结束掉了订阅的关系,但是当我们使用unsafeSubscribe操作符,该操作符不会使得Observer主动结束订阅关系的时候去订阅Observable的例子:
1 | ob.doOnUnsubscribe(new Action0() { |
输出的结果是:
1 | 11-01 14:01:01.290 7672-7672/com.example.user.testproject I/RxJavaTest: A |
可以看到他是没有正常结束的,应该是发射了C之后就结束的,同时也没有调用unsafeSubscribe函数,这时候我们需要维持正常,就可以使用serialize谓词,例如:
1 | ob.doOnUnsubscribe(new Action0() { |
输出结果:
1 | 11-01 14:03:08.302 9525-9525/com.example.user.testproject I/RxJavaTest: A |
这时候看到的是正常结束了。
图:
ObserveOn操作符
1 | Observable<T> observeOn(Scheduler scheduler) |
该操作符用于指定Observer在那个调度器上面去观察Observable,也就是Observable发射数据之后,在Observer中处理时候的线程。例如:
1 | Log.i(TAG, Thread.currentThread().getName()); |
输出结果:
1 | 11-01 14:30:00.235 882-882/com.example.user.testproject I/RxJavaTest: main |
可以看到Observer处理的线程就是我们通过observeOn操作符给他定义的线程
图:
SubscribeOn操作符
1 | Observable<T> subscribeOn(Scheduler scheduler) |
用于指定Observable运行call的时候的线程,或者说是Observable自身所在的线程。例如:
1 | Log.i(TAG, Thread.currentThread().getName()); |
输出:
1 | 11-01 14:33:03.439 3927-3927/com.example.user.testproject I/RxJavaTest: main |
可以看到Observable所在的线程是我们给他指定的线程,Observer默认不在任何调度器上面执行,所以也跟着在这个线程上面执行,我们可以这样子指定:
1 | Log.i(TAG, Thread.currentThread().getName()); |
这样子Observer就可以在指定的不用于Observable所在的线程,注意这里需要引入RxAdnroid。
图:
subscribe操作符
1 | Subscription subscribe() //只是需要让Observable和Observer发生关联,不需要处理Observable发射的数据 |
就是平常我们使用最多的一个操作符,用于关联Observable和Observer。他的例子就不写了。
图:
timeInterval操作符
1 | Observable<TimeInterval<T>> timeInterval() |
他的作用是把源Observable发射的数据T装换为一个TimeInterval
例如:
1 | Observable<String> ob = Observable.create(new Observable.OnSubscribe<String>() { |
结果输出:
1 | 11-01 14:53:02.878 21735-21749/? I/RxJavaTest: 前后相差时间:2 值:1 |
图:
Timeout操作符
第一种:
1 | Observable<T> timeout(long timeout, TimeUnit timeUnit) |
该操作符在两个Observable数据发射之前的时间间隔超出了timeout,就会发射一个TimeoutException给Observer的onError。例如:
1 | Observable<String> ob = Observable.create(new Observable.OnSubscribe<String>() { |
输出结果:
1 | 11-01 15:20:03.978 13695-13710/? I/RxJavaTest: A |
从这里可以看到是两个Observable数据发射的时间间隔,而不是总体的发射时间。
第二种:
1 | Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<? extends T> other) |
这个函数可以在超时之后发射一个Observable座位补充,他是接着在源的数据之后发射,例如:
1 | Observable<String> ob = Observable.create(new Observable.OnSubscribe<String>() { |
输出:
1 | 11-01 15:25:24.379 18453-18468/? I/RxJavaTest: A |
这样子就可以让Observable顺利发射完成。
第三种:
1 | Observable<T> timeout(Func1<? super T, ? extends Observable<V>> timeoutSelector) |
这个谓词,他返回一个跟源Observable对应的Observable,当该谓词的Observable发射终止,源Observable还没有发射数据的时候,就发射一个TimeoutException给Observer的onError。第二个函数是不发射给onError,而是在源数据的基础之上继续发射替补的other。例如:
1 | Observable<String> ob = Observable.create(new Observable.OnSubscribe<String>() { |
输出结果:
1 | 11-01 15:36:21.216 28118-28142/com.example.user.testproject I/RxJavaTest: A |
第二个函数也是类似的。
第四种:
1 | Observable<T> timeout(Func0<? extends Observable<U>> firstTimeoutSelector, Func1<? super T, ? extends Observable<V>> timeoutSelector) |
这是可以给第一个Observable的数据设置超时,即当源Observable的第一个数据没有在firstTimeoutSelector的终止之前发射就会发射TimeoutException。其他的是跟上面类似,例子就不写了。
图:
Timestamp操作符
1 | Observable<Timestamped<T>> timestamp() |
他的作用是把源数据包装一次,让它带有发射时间,包装之后的数据是Timestamped
1 | Observable<String> ob = Observable.create(new Observable.OnSubscribe<String>() { |
输出结果:
1 | 11-01 15:49:09.785 7365-7379/? I/RxJavaTest: timestampTimestamped(timestampMillis = 1478015349785, value = A) |
图:
Using操作符
1 | Observable<T> using( |
创建一个只在Observable生命周期内存在的一次性资源,需要释放的资源,比如:
1 | Observable.using(new Func0<String>() { |
其中第一个参数是创建资源,第二个参数是创建资源对应的Observable对象,第三个参数是释放资源。他适合与一些全局的一些需要被及时回收而我们需要确定他回收的资源。
图:
布尔操作符
我们可以使用布尔操作符,对源数据Observable发送的数据进行变换或者是直接的布尔操作,来达到一些目的。
all 操作符
1 | Observable<Boolean> all(Func1<? super T, Boolean> predicate) |
用于判断Observable的所有数据是否满足某一个条件
例如:
1 | Observable.range(10,3).all(new Func1<Integer, Boolean>() { |
输出是true,加入把条件改为
1 | @Override |
那么就是输出false了。
图:
amb 操作符
1 | Observable<T> amb(Iterable<? extends Observable<? extends T>> sources) |
amb操作符的作用是用于比较两个Observable,然后哪一个先发射数据就只是发射这个Observable的数据,抛弃其他的Observable数据,不论他第一个是发射了onError还是onComplete,例子:
1 | Observable<Integer> o1 = Observable.range(20, 1).delay(200, TimeUnit.MILLISECONDS); |
输出:
1 | amb==10 |
有一个ambWith操作符:
1 | Observable<T> ambWith(Observable<? extends T> t1) |
他是一个非静态的,作用跟上面的一样。
图:
contains 操作符
1 | Observable<Boolean> contains(final Object element) |
该操作符的作用是用于判断源数据时候包含某一个元素,例如:
1 | Observable.range(10,5).contains(11).subscribe(new Action1<Boolean>() { |
输出是true,因为源数据的范围是10-14之间,包含11.
图:
defaultIfEmpty 操作符
1 | Observable<T> defaultIfEmpty(final T defaultValue) |
该操作符的作用是在源Observable没有发射数据的时候发射一个默认数据,但是有onComplete()调用,表明他已经发射数据完毕,在没有数据发射的时候调用了onError是不会发射默认数据的,例如:
1 | Observable<String> ob = Observable.create(new Observable.OnSubscribe<String>() { |
结果是:A,假如是发射onError的时候则是只输出“error”,而不会输出A
图:
sequenceEqual 操作符
1 | Observable<Boolean> sequenceEqual(Observable<? extends T> first, Observable<? extends T> second) |
第一个函数,用于判断输入的两个Observable是否相同,包括发射长度,顺序,值,终止状态,例如:
1 | Observable.sequenceEqual(Observable.just("A","B"),Observable.just("A","B")).subscribe(new Action1<Boolean>() { |
输出就是true,把第二个Observable改为Observable.just(“B”,”A”),输出就是false。
终止状态不同的时候,比如:
1 | Observable<String> o1 = Observable.create(new Observable.OnSubscribe<String>() { |
就会输出了error。
第二个函数,他多了一个参数,是用于比较,自己确定规则的那种比较,两个Observable的值是否相同。例如:
1 | Observable.sequenceEqual(Observable.just("A"), Observable.just("A"), new Func2<String, String, Boolean>() { |
就是输出true,
图:
skipUntil 操作符
1 | Observable<T> skipUntil(Observable<U> other) |
用于废弃源Observable,知道other操作符符发射数据,例如:
1 | Observable<String> s1 = Observable.create(new Observable.OnSubscribe<String>() { |
输出结果是只输出了D,以为前面的C,E在other发射数据之前已经发射,所以会被丢弃。
图:
skipWhile 操作符
1 | Observable<T> skipWhile(Func1<? super T, Boolean> predicate) |
他的作用在于可以自己定规则,忽略前面不符合条件的数据,但是当一旦返回了true,该数据就不会被发射,然后predicate谓词还是会被调用,但是当predicate返回了false之后就不会再次被调用。例如:
1 | Observable<String> s1 = Observable.create(new Observable.OnSubscribe<String>() { |
输出:
1 | 11-02 14:08:21.526 29654-29654/com.example.user.testproject I/RxJavaTest: skipWhile |
可以看到C没有输出,而且调用了skipWhile两次,skipWhile返回了false之后就没有被再次调用而是直接发射了源Observable后面的数据。
图:
TakeUntil 操作符
1 | Observable<T> takeUntil(Observable<? extends E> other) |
他的作用在于other在发射一个数据或者是onComplete的时候,回去终止源Observable发射数据,例如:
1 | Observable<String> o1 = Observable.create(new Observable.OnSubscribe<String>() { |
输出结果:
1 | 11-02 14:24:02.505 11375-11396/com.example.user.testproject I/RxJavaTest: A |
可以看到other是在100ms之后再去发射数据的,observable在100ms之后发射的数据会被忽略。图:
TakeWhile 操作符
1 | Observable<T> takeWhile(final Func1<? super T, Boolean> predicate) |
他的作用是通过predicate函数返回一个boolean值,假如是返回了true,则发射该数据,但是一旦返回来false,就终止源Observable的发射,同时调用他的onComplete()方法,例子:
1 | Observable<String> o1 = Observable.create(new Observable.OnSubscribe<String>() { |
输出结果:
1 | 11-02 14:32:00.313 18417-18417/com.example.user.testproject I/RxJavaTest: takeWhile==A |
因为第一个数据就使得predicate返回了false,所以终止了发射,假如predicate换成:
1 | @Override |
则会输出全部,图: