RxJava2 实战系列文章

RxJava2 实战知识梳理(1) – 后台执行耗时操作,实时通报 UI
更新
RxJava2 实战知识梳理(2) –
计算一段时间内数据的平均值
RxJava2 实战知识梳理(3) –
优化搜索联想功能
RxJava2 实战知识梳理(4) – 结合 Retrofit
请求新闻资讯
RxJava2 实战知识梳理(5) –
简单与进阶的轮询操作
RxJava2 实战知识梳理(6) –
基于左类型的重试请求
RxJava2 实战知识梳理(7) – 基于 combineLatest
实现的输入表单验证
RxJava2 实战知识梳理(8) – 使用 publish + merge
优化先加载缓存,再念博网络数据的请求过程
RxJava2 实战知识梳理(9) – 使用 timer/interval/delay
实现任务调度
RxJava2 实战知识梳理(10) – 屏幕旋转导致 Activity
重建时回升任务
RxJava2 实战知识梳理(11) –
检测网络状态并自动重试请求
RxJava2 实战知识梳理(12) – 实战讲解 publish & replay & share & refCount
& autoConnect
RxJava2 实战知识梳理(13) –
如何令错误有常无自行终止订阅关系
RxJava2 实战知识梳理(14) – 在 token 过期时,刷新过期 token
并重新发起呼吁
RxJava2 实战知识梳理(15) – 实现一个简单易行的 MVP + RxJava + Retrofit
应用


一、前言

以很多情报应用中,当我们进去一个初的页面,为了提升用户体验,不叫页面空白太漫长,我们一般会先行念博缓存中之数码,再错过央求网络。

今眼看篇稿子,我们将实现下面这个作用:同时提倡读取缓存、访问网络的恳求,如果缓存的数码先回来,那么就是先亮缓存的数额,而如网络的多少先返,那么即使不再显示缓存的数据。

为了给大家对立即同过程发生再深切的理解,我们介绍”先加载缓存,再要网络”这种模型的季种植实现方式,其中第四种实现可上地方我们所说的作用,而前的老三种实现虽然为能够实现均等的要求,并且可健康工作,但是在某些特殊情况下,会面世奇怪的事态:

  • 使用concat实现
  • 使用concatEager实现
  • 使用merge实现
  • 使用publish实现

二、示例

2.1 准备干活

咱们需要准备简单只Observable,分别代表 缓存数据源
纱数据源,在里填入相应的缓存数据和网络数据,为了以后演示一些突出的气象,我们好以开立它的时段指定它执行的光阴:

   //模拟缓存数据源。
    private Observable<List<NewsResultEntity>> getCacheArticle(final long simulateTime) {
        return Observable.create(new ObservableOnSubscribe<List<NewsResultEntity>>() {
            @Override
            public void subscribe(ObservableEmitter<List<NewsResultEntity>> observableEmitter) throws Exception {
                try {
                    Log.d(TAG, "开始加载缓存数据");
                    Thread.sleep(simulateTime);
                    List<NewsResultEntity> results = new ArrayList<>();
                    for (int i = 0; i < 10; i++) {
                        NewsResultEntity entity = new NewsResultEntity();
                        entity.setType("缓存");
                        entity.setDesc("序号=" + i);
                        results.add(entity);
                    }
                    observableEmitter.onNext(results);
                    observableEmitter.onComplete();
                    Log.d(TAG, "结束加载缓存数据");
                } catch (InterruptedException e) {
                    if (!observableEmitter.isDisposed()) {
                        observableEmitter.onError(e);
                    }
                }
            }
        });
    }
    //模拟网络数据源。
    private Observable<List<NewsResultEntity>> getNetworkArticle(final long simulateTime) {
        return Observable.create(new ObservableOnSubscribe<List<NewsResultEntity>>() {
            @Override
            public void subscribe(ObservableEmitter<List<NewsResultEntity>> observableEmitter) throws Exception {
                try {
                    Log.d(TAG, "开始加载网络数据");
                    Thread.sleep(simulateTime);
                    List<NewsResultEntity> results = new ArrayList<>();
                    for (int i = 0; i < 10; i++) {
                        NewsResultEntity entity = new NewsResultEntity();
                        entity.setType("网络");
                        entity.setDesc("序号=" + i);
                        results.add(entity);
                    }
                    observableEmitter.onNext(results);
                    observableEmitter.onComplete();
                    Log.d(TAG, "结束加载网络数据");
                } catch (InterruptedException e) {
                    if (!observableEmitter.isDisposed()) {
                        observableEmitter.onError(e);
                    }
                }
            }
        });
    }

于结尾之下游,我们接收数据,并当页面上通过RecyclerView展开亮:

    private DisposableObserver<List<NewsResultEntity>> getArticleObserver() {
        return new DisposableObserver<List<NewsResultEntity>>() {

            @Override
            public void onNext(List<NewsResultEntity> newsResultEntities) {
                mNewsResultEntities.clear();
                mNewsResultEntities.addAll(newsResultEntities);
                mNewsAdapter.notifyDataSetChanged();
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "加载错误, e=" + throwable);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "加载完成");
            }
        };
    }

2.2 使用 concat 实现

concat举凡过剩稿子都推荐使用的计,因为其不见面生出另外问题,实现代码如下:

   private void refreshArticleUseContact() {
        Observable<List<NewsResultEntity>> contactObservable = Observable.concat(
                getCacheArticle(500).subscribeOn(Schedulers.io()), getNetworkArticle(2000).subscribeOn(Schedulers.io()));
        DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
        contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
    }

面就段代码的运转结果吧:

起控制台的输出可以观看,整个经过是事先得读博缓存,等缓存的数据读取了之后,才起来请网络,因此整个经过的耗时吧寡独号的相加,即2500ms

它们的法则图如下所示:

concat 原理图

于常理图被为说明了咱们眼前的景,它会连多单Observable,并且要使对等交前方一个Observable的备数据项都发送了事后,才见面开下一个Observable多少的殡葬。

那么,concat操作符的老毛病是什么为?很明显,我们白白浪费了面前读取缓存的当即段时光,能不能够而提倡读取缓存和网络的呼吁,而非是相当及读取缓存完毕后,才去要网络为?

2.3 使用 concatEager 实现

为了缓解眼前没有以提倡呼吁的题材,我们可以使用concatEager,它的以办法如下:

    private void refreshArticleUseConcatEager() {
        List<Observable<List<NewsResultEntity>>> observables = new ArrayList<>();
        observables.add(getCacheArticle(500).subscribeOn(Schedulers.io()));
        observables.add(getNetworkArticle(2000).subscribeOn(Schedulers.io()));
        Observable<List<NewsResultEntity>> contactObservable = Observable.concatEager(observables);
        DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
        contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
    }

它和concat极可怜之异就是是基本上个Observable足而且开班发出数量,如果后一个Observable放完成后,前一个Observable再有放了数据,那么它见面以继一个Observable的数先缓存起来,等到前一个Observable发出了后,才将缓存的数码发射出来。

方代码中,请求缓存的时长改呢500ms,而告网络的时长改也2000ms,运行结果吗:

那么这种实现方式的老毛病是啊吧?就是在少数异常情况下,如果读取缓存的时光若是逾网络要的年华,那么就会促成出现“网络要的结果”等待“读取缓存”这同样进程完成后才会传递让下游,白白浪费了一段时间。

咱们将请求缓存的时长改吗2000ms,而要网络的时长改呢500ms,查看控制台的出口,可以证实点的定论:

2.4 使用 merge 实现

脚,我们来拘禁一下merge操作符的言传身教:

    private void refreshArticleUseMerge() {
        Observable<List<NewsResultEntity>> contactObservable = Observable.merge(
                getCacheArticle(500).subscribeOn(Schedulers.io()), getNetworkArticle(2000).subscribeOn(Schedulers.io()));
        DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
        contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
    }

merge的法则图如下所示:

merge 原理图

它和concatEager如出一辙,会吃大多单Observable再就是开放数量,但是她不欲Observable以内的互相等待,而是直接发送给下游。

当缓存时间也500ms,而要网络时间吗2000ms常常,它的结果也:

在读取缓存的时刻低于求网络的时日常,这个操作符能够生好的工作,但是反之,就会见现出我们先行出示了网的数目,然后还要为刷新成原来的缓存数据。

起拖欠特别时的气象如下所示:

2.5 使用 publish 实现

使用publish的兑现如下所示:

    private void refreshArticleUsePublish() {
        Observable<List<NewsResultEntity>> publishObservable = getNetworkArticle(2000).subscribeOn(Schedulers.io()).publish(new Function<Observable<List<NewsResultEntity>>, ObservableSource<List<NewsResultEntity>>>() {

            @Override
            public ObservableSource<List<NewsResultEntity>> apply(Observable<List<NewsResultEntity>> network) throws Exception {
                return Observable.merge(network, getCacheArticle(500).subscribeOn(Schedulers.io()).takeUntil(network));
            }

        });
        DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
        publishObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
    }

及时个中一共涉及到了三个操作符,publishmergetakeUnti,我们先来拘禁一下她能否解决我们事先三栽方式的先天不足:

  • 读取缓存的时间吗500ms,请求网络的时刻呢2000ms

  • 读取缓存的时日吧2000ms,请求网络的工夫为500ms

可以看出,在读取缓存的时光过请求网络时之上,仅仅只是会来得网络的数额,显示力量也:

再就是读取缓存和要网络是还要提倡的,很好地解决了前几种植实现方式的先天不足。

这边而感谢简友 无意下棋
在评价里干的题目:如果网络要先回去时发出了左(例如没有网络等)导致发送了onError事件,从而使得缓存的Observable啊束手无策发送事件,最后界面显示空白。

对此问题,我们得对纱的Observable拓展优化,让那个莫将onError事件传递让下游。其中同样栽缓解智是经采用onErrorResume操作符,它好接一个Func函数,其形参为网络发送的错,而以上游有误时会回调该函数。我们得根据错误的类型来回到一个新的Observable,让订阅者镜像到者新的Observable,并且忽略onError事件,从而避免onError事件致使整个订阅关系的截止。

此间为了避免订阅者在镜像到新的Observable时会收到额外的日,我们回来一个Observable.never(),它代表一个千古不发送事件之上游。

    private Observable<List<NewsResultEntity>> getNetworkArticle(final long simulateTime) {
        return Observable.create(new ObservableOnSubscribe<List<NewsResultEntity>>() {
            @Override
            public void subscribe(ObservableEmitter<List<NewsResultEntity>> observableEmitter) throws Exception {
                try {
                    Log.d(TAG, "开始加载网络数据");
                    Thread.sleep(simulateTime);
                    List<NewsResultEntity> results = new ArrayList<>();
                    for (int i = 0; i < 10; i++) {
                        NewsResultEntity entity = new NewsResultEntity();
                        entity.setType("网络");
                        entity.setDesc("序号=" + i);
                        results.add(entity);
                    }
                    //a.正常情况。
                    //observableEmitter.onNext(results);
                    //observableEmitter.onComplete();
                    //b.发生异常。
                    observableEmitter.onError(new Throwable("netWork Error"));
                    Log.d(TAG, "结束加载网络数据");
                } catch (InterruptedException e) {
                    if (!observableEmitter.isDisposed()) {
                        observableEmitter.onError(e);
                    }
                }
            }
        }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends List<NewsResultEntity>>>() {

            @Override
            public ObservableSource<? extends List<NewsResultEntity>> apply(Throwable throwable) throws Exception {
                Log.d(TAG, "网络请求发生错误throwable=" + throwable);
                return Observable.never();
            }
        });
    }

当有误时,控制台的输出如下,可以看到缓存还正常地发送给了下游:

下,我们不怕来分析一下她的兑现原理。

2.5.1 takeUntil

takeUntil的原理图如下所示:

这里,我们给sourceObservable通过takeUntil流传了其它一个otherObservable,它表示sourceObservableotherObservable放数量以后,就无同意再发射数量了,这就是恰恰满足了我们前说的“只要网络源发送了数额,那么缓存源就无应允还发射数量”。

日后,我们重新用前介绍过的merge操作符,让简单独缓存源和网络源同时开工作,去取多少。

2.5.2 publish

而上面有几许瑕疵,就是调用mergetakeUntil会面发两糟糕订阅,这时候就用采取publish操作符,它接受一个Function函数,该函数返回一个Observable,该Observable是对原Observable,也即是点网络源的Observable变之后的结果,该Observable可以被takeUntilmerge操作符所共享,从而实现只订阅一差的作用。

publish的原理图如下所示:

publish 原理图


更多篇,欢迎访问我的 Android 知识梳理系列:

  • Android
    知识梳理目录:http://www.jianshu.com/p/fd82d18994ce
  • 个人主页:http://lizejun.cn
  • 私知识总结目录:http://lizejun.cn/categories/
网站地图xml地图