科雷傲xJava2 实战体系小说

PAJEROxJava2 实战知识梳理(1) – 后台执行耗费时间操作,实时通报 UI
更新

凯雷德xJava2 实战知识梳理(2) –
总括一段时间内数据的平均值

RAV4xJava2 实战知识梳理(3) –
优化搜索联想功用

SportagexJava2 实战知识梳理(4) – 结合 Retrofit
请求消息资源消息

奥迪Q3xJava2 实战知识梳理(5) –
简单及进阶的轮询操作

GL450xJava2 实战知识梳理(6) –
基于错误类型的重试请求

HighlanderxJava2 实战知识梳理(7) – 基于 combineLatest
达成的输入表单验证

RubiconxJava2 实战知识梳理(8) – 使用 publish + merge
优化先加载缓存,再读取互连网数据的伸手进程

奥迪Q5xJava2 实战知识梳理(9) – 使用 timer/interval/delay
落成职分调度

RAV4xJava2 实战知识梳理(10) – 显示屏旋转导致 Activity
重建时回涨职务

奥迪Q3xJava2 实战知识梳理(11) –
检查和测试网络状态并机关心注重试请求

汉兰达xJava2 实战知识梳理(12) – 实战讲解 publish & replay & share & refCount
& autoConnect

福睿斯xJava2 实战知识梳理(13) –
怎么着使得错误爆发时不自动停止订阅关系

本田UR-VxJava2 实战知识梳理(14) – 在 token 过期时,刷新过期 token
并再一次发起呼吁

卡宴xJava2 实战知识梳理(15) – 达成三个简便的 MVP + PRADOxJava + Retrofit
应用


一、示例

1.1 应用场景

明天,咱们介绍一种新的现象,轮询操作。约等于说,大家会尝试间隔一段时间就向服务器发起一回呼吁,在采纳RxJava事先,该供给的贯彻一般有三种格局:

  • 通过Handler出殡延时新闻,在handleMessage中呼吁服务器之后,再一次发送二个延时音信,直到达到循环次数结束。
  • 使用Java提供的定时器Timer

咱俩尝试运用RxJava2提供的操作符来兑现这一供给,那里演示二种艺术的轮询,并将单次访问的次数限制在5次:

  • 定位时延:使用intervalRange操作符,每间隔3s推行3次任务。
  • 变长时延:使用repeatWhen操作符达成,第叁回进行完任务后,等待4s再履行第3次职分,在第③遍职分履行到位后,等待5s,依次递增。

2.2 示例

public class PollingActivity extends AppCompatActivity {

    private static final String TAG = PollingActivity.class.getSimpleName();

    private TextView mTvSimple;
    private TextView mTvAdvance;
    private CompositeDisposable mCompositeDisposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_polling);
        mTvSimple = (TextView) findViewById(R.id.tv_simple);
        mTvSimple.setOnClickListener(new View.OnClickListener() {

            @Override
            public void onClick(View v) {
                startSimplePolling();
            }

        });
        mTvAdvance = (TextView) findViewById(R.id.tv_advance);
        mTvAdvance.setOnClickListener(new View.OnClickListener() {

            @Override
            public void onClick(View v) {
                startAdvancePolling();
            }

        });
        mCompositeDisposable = new CompositeDisposable();
    }

    private void startSimplePolling() {
        Log.d(TAG, "startSimplePolling");
        Observable<Long> observable = Observable.intervalRange(0, 5, 0, 3000, TimeUnit.MILLISECONDS).take(5).doOnNext(new Consumer<Long>() {

            @Override
            public void accept(Long aLong) throws Exception {
                doWork(); //这里使用了doOnNext,因此DisposableObserver的onNext要等到该方法执行完才会回调。
            }

        });
        DisposableObserver<Long> disposableObserver = getDisposableObserver();
        observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
        mCompositeDisposable.add(disposableObserver);
    }

    private void startAdvancePolling() {
        Log.d(TAG, "startAdvancePolling click");
        Observable<Long> observable = Observable.just(0L).doOnComplete(new Action() {

            @Override
            public void run() throws Exception {
                doWork();
            }

        }).repeatWhen(new Function<Observable<Object>, ObservableSource<Long>>() {

            private long mRepeatCount;

            @Override
            public ObservableSource<Long> apply(Observable<Object> objectObservable) throws Exception {
                //必须作出反应,这里是通过flatMap操作符。
                return objectObservable.flatMap(new Function<Object, ObservableSource<Long>>() {

                    @Override
                    public ObservableSource<Long> apply(Object o) throws Exception {
                        if (++mRepeatCount > 4) {
                            //return Observable.empty(); //发送onComplete消息,无法触发下游的onComplete回调。
                            return Observable.error(new Throwable("Polling work finished")); //发送onError消息,可以触发下游的onError回调。
                        }
                        Log.d(TAG, "startAdvancePolling apply");
                        return Observable.timer(3000 + mRepeatCount * 1000, TimeUnit.MILLISECONDS);
                    }

                });
            }

        });
        DisposableObserver<Long> disposableObserver = getDisposableObserver();
        observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
        mCompositeDisposable.add(disposableObserver);
    }

    private DisposableObserver<Long> getDisposableObserver() {

        return new DisposableObserver<Long>() {

            @Override
            public void onNext(Long aLong) {}

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "DisposableObserver onError, threadId=" + Thread.currentThread().getId() + ",reason=" + throwable.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "DisposableObserver onComplete, threadId=" + Thread.currentThread().getId());
            }
        };
    }

    private void doWork() {
        long workTime = (long) (Math.random() * 500) + 500;
        try {
            Log.d(TAG, "doWork start,  threadId=" + Thread.currentThread().getId());
            Thread.sleep(workTime);
            Log.d(TAG, "doWork finished");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        mCompositeDisposable.clear();
    }
}

startSimplePolling对应于固定时延轮询:

startAdvancePolling对应于变长时延轮询:

叁 、示例解析

上边,就让大家一起来分析一下下面那个例子中关系到的知识点。

3.1 intervalRange & doOnNext 完结稳定时延轮询

对此平素时延轮询的须求,选用的是intervalRange的主意来兑现,它是八个创建型操作符,该Observable先是次首发射二个一定的数目,之后距离一段时间再发送2遍,它是intervalrange的结合体,那多个操作符的法则图为:

interval 原理图

range 原理图

该操作符的优势在于:

  • interval比较之下,它能够内定第②个发送数据项的时延、钦点发送数据项的个数。
  • range对待,它能够钦定两项数据里面发送的时延。

intervalRange的收受参数的含义为:

  • start:发送数据的初始值,为Long型。
  • count:总共发送多少项数据。
  • initialDelay:发送第一个数据项时的伊始时延。
  • period:两项数据里面的间隔时间。
  • TimeUnit:时间单位。

在轮询操作中貌似会开始展览部分耗费时间的网络请求,因而我们接纳在doOnNext拓展处理,它会在下游的onNext格局被回调从前调用,可是它的运行线程能够通过subscribeOn点名,下游的运作线程再经过observerOn切换会主线程,通过打字与印刷对应的线程ID能够表明结果。

当供给的数量项都发送实现之后,最终会回调onComplete方法。

3.2 repeatWhen 完毕变长时延轮询

3.2.1 使用 repeatWhen 达成重订阅

就此得以因而repeatWhen来落实轮询,是因为它为大家提供了重订阅的作用,而重订阅有两点成分:

  • 上游告诉我们1回订阅已经形成,那就须要上游回调onComplete函数。
  • 大家告知上游是或不是要求重订阅,通过repeatWhenFunction函数所重返的Observable确定,如果该Observable发送了onComplete或者onError则表示不供给重订阅,甘休全数流程;不然触发重订阅的操作。

其规律图如下所示:

repeatWhen 原理图

repeatWhen的难题在于怎么样定义它的Function参数:

  • Function的输入是1个Observable<Object>,输出是叁个泛型ObservableSource<?>
  • 要是出口的Observable发送了onComplete或者onError则意味着不要求重订阅,停止全体工艺流程;不然触发重订阅的操作。也正是说,它
    偏偏是当做一个是还是不是要触发重订阅的通报onNext出殡的是怎么样数据并不首要。
  • 对此每贰次订阅的数目流 Function
    函数只会回调2遍
    ,并且是在onComplete的时候接触,它不会接到任何的onNext事件。
  • Function函数中,不能够不对输入的
    Observable<Object>进行拍卖
    ,这里大家使用的是flatMap操作符接收上游的多寡,对于flatMap的阐述,我们能够参考
    奔驰G级xJava2 实战知识梳理(4) – 结合 Retrofit
    请求音信资源音信

而当大家不须求重订阅时,有三种艺术:

  • 返回Observable.empty(),发送onComplete消息,但是DisposableObserver并不会回调onComplete
  • 返回Observable.error(new Throwable("Polling work finished"))DisposableObserveronError会被回调,并接受传过去的错误音信。

3.2.2 使用 Timer 完成四回订阅之间的时延

以上就是对于repeatWhen的解释,与repeatWhen相近似的还有retryWhen操作符,这些大家在下一篇作品中再介绍,接下去,大家看一下如何贯彻五遍事件的时延。

前边大家分析过,重订阅触发的小运是在回来的ObservableSource发送了onNext事件过后,那么大家因此该ObservableSource推迟发送二个风浪就能够完成相应的必要,那里运用的是time操作符,它的原理图如下所示,也正是,在订阅完毕后,等待内定的大运它才会发送信息。

timer 原理图

3.2.3 使用 doOnComplete 完成轮询的耗费时间操作

由于在订阅达成时会发送onComplete音讯,那么大家就能够在doOnComplete中进行轮询所要实行的具体操作,它所运维的线程通过subscribeOn指定。


愈多小说,欢迎访问作者的 Android 知识梳理种类:

网站地图xml地图