RxJava2 实战系列文章

RxJava2 实战知识梳理(1) – 后台执行耗时操作,实时通报 UI
更新
RxJava2 实战知识梳理(2) –
计算一段时间内数据的平均值
RxJava2 实战知识梳理(3) –
优化搜索联想功能
RxJava2 实战知识梳理(4) – 结合 Retrofit
请求新闻消息
RxJava2 实战知识梳理(5) –
简单和进阶的轮询操作
RxJava2 实战知识梳理(6) –
基于左类型的重试请求
RxJava2 实战知识梳理(7) – 基于 combineLatest
实现之输入表单验证
RxJava2 实战知识梳理(8) – 使用 publish + merge
优化先加载缓存,再念博网络数据的呼吁过程
RxJava2 实战知识梳理(9) – 使用 timer/interval/delay
实现任务调度
RxJava2 实战知识梳理(10) – 屏幕旋转导致 Activity
重建时回升任务
RxJava2 实战知识梳理(11) –
检测网络状态并活动重试请求
RxJava2 实战知识梳理(12) – 实战讲解 publish & replay & share & refCount
& autoConnect
RxJava2 实战知识梳理(13) –
如何使错误产生时莫活动终止订阅关系
RxJava2 实战知识梳理(14) – 在 token 过期时,刷新过期 token
并再度发起呼吁
RxJava2 实战知识梳理(15) – 实现一个简单的 MVP + RxJava + Retrofit
应用


一样、发生错误时已订阅的情景

RxJava备受,如果发了错误,那么 订阅者会自行终止对上游的订阅关系
,我们将造成订阅取消的错误分成两栽:

  • 上游:上游发生误,并发送onError事件为订阅者。
  • 下游:订阅者在onNext遭遇处理时来了大。

RxJava的宏图中,如果起了不当,那么订阅关系就销了。但是于一些时段,我们要于错误产生的时候绝不取消订阅,因为这么订阅者只有重新经subscribe方法才会收到信,类似之场景如监测数据源转变、RxBus的贯彻等。

咱先用少单简易的例子来演示一下端提到的点滴栽状态:

1.1 上游传递信息不时起错误

订阅者在开头时候订阅到mPublishObject,当该PublishObject出殡至第四个事件时,主动弃来一个怪,以套上游发生异常的状况。

     private void upError() {
        mPublishSubject.map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) throws Exception {
                if (integer == 4) {
                    throw new RuntimeException();
                }
                return integer;
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(getNormalObserver());
    }

    private Observer<Integer> getNormalObserver() {
        return new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext=" + value);
            }
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError=" + e);
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };
    }

由控制台的出口可以看来,当第四差发送事件后,由于上游有了特别,因此订阅者收到了onError事件,之后她就是重新为无能为力吸收信息了。

1.2 订阅者处理消息不时发生错误

下面,我们重新来拘禁订阅处理消息不时发出错误的状况:

    private void downError() {
        mPublishSubject.observeOn(AndroidSchedulers.mainThread()).subscribe(getErrorObserver());
    }

    private LambdaObserver<Integer> getErrorObserver() {
        return new LambdaObserver<>(new Consumer<Integer>() {
            @Override
            public void accept(Integer value) throws Exception {
                Log.d(TAG, "onNext=" + value);
                if (value == 4) {
                    throw new RuntimeException();
                }
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "onError=" + throwable);
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "onComplete");
            }
        }, new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {

            }
        });
    }

咱俩以订阅者收到第四个数据的时候丢来一个不行,此时控制台的出口为如下,与地方类似,之后订阅者都没法儿吸纳到信息,因为订阅关系曾经给铲除了。

老二、发生异常时的拍卖措施

2.1 上游有误

以上游发生误的时段,一般经过再订阅的计来缓解。我们得以因错误的品类判断是否需要更订阅,重订阅的早晚以retryWhen操作符,这个我们当
RxJava2 实战知识梳理(6) –
基于左类型的重试请求
已经介绍了了。

下,我们演示一下每当上头的错误中哪恢复:

    private void upErrorIgnore() {
        mPublishSubject.map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) throws Exception {
                if (integer == 4) {
                    throw new RuntimeException("retry");
                } else if (integer == 8) {
                    throw new RuntimeException("don't retry");
                }
                return integer;
            }
        }).observeOn(AndroidSchedulers.mainThread()).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                //第一步,通过flatMap对错误进行响应。
                return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Throwable throwable) throws Exception {
                        //第二步:根据错误的类型判断是否需要重订阅。
                        return "retry".equals(throwable.getMessage()) ? Observable.just(0) : Observable.empty();
                    }
                });
            }
        }).subscribe(getNormalObserver());
    }

于第四次等/第八次等点击的是不是,我们独家以上游抛来一个万分,这样尽管见面沾retryWhen的回调,在内部我们分为注释中的少数部分进行处理,第四浅的时段发起重订阅,而第八破则非提倡,因此,第九个事件订阅者就收不交了,控制台的出口为:

2.2 订阅者发生错误

但是retryWhen只好处理上游发生错误的状态,对于地方说之亚栽状态并无能够处理,因此而是方介绍的第二种情形:订阅者在onNext拍卖中生出错误的景象,仍然会破订阅关系。

此地首先要感谢 Johnny
Shieh
提供的解决办法,在 RxJava 2 版本的
Rxbus
一文被,他分析了马上同题材的由来,这是盖于LambdaObserver的源码中,如果在onNext吃生出了很,那么首先会见调用onError方法,而onError中会执行撤销订阅的操作。

解决办法就是,拿 LambdaObserver
代码拷贝出来,注释掉那句,然后继续给她去实现 Observer
,代码在
RxSample
的十三段例子中。

打控制台可以看,并没排除订阅关系,在产生误后,仍然可以延续吸纳多少。


更多篇,欢迎访问我的 Android 知识梳理系列:

  • Android
    知识梳理目录:http://www.jianshu.com/p/fd82d18994ce
  • 个人主页:http://lizejun.cn
  • 私知识总结目录:http://lizejun.cn/categories/
网站地图xml地图