RxJava2 实战系列文章

RxJava2 实战知识梳理(1) – 后台执行耗时操作,实时通报 UI
更新
RxJava2 实战知识梳理(2) –
计算一段时间内数据的平均值
RxJava2 实战知识梳理(3) –
优化搜索联想功能
RxJava2 实战知识梳理(4) – 结合 Retrofit
请求新闻资讯
RxJava2 实战知识梳理(5) –
简单与进阶的轮询操作
RxJava2 实战知识梳理(6) –
基于左类型的重试请求
RxJava2 实战知识梳理(7) – 基于 combineLatest
实现之输入表单验证
RxJava2 实战知识梳理(8) – 使用 publish + merge
优化先加载缓存,再念博网络数据的恳求过程
RxJava2 实战知识梳理(9) – 使用 timer/interval/delay
实现任务调度
RxJava2 实战知识梳理(10) – 屏幕旋转导致 Activity
重建时回升任务
RxJava2 实战知识梳理(11) –
检测网络状态并机关重试请求
RxJava2 实战知识梳理(12) – 实战讲解 publish & replay & share & refCount
& autoConnect
RxJava2 实战知识梳理(13) –
如何令错误有常未自行停止订阅关系
RxJava2 实战知识梳理(14) – 在 token 过期时,刷新过期 token
并重复发起呼吁
RxJava2 实战知识梳理(15) – 实现一个简练的 MVP + RxJava + Retrofit
应用


一、示例

1.1 应用场景

今,我们介绍一种植新的观,轮询操作。也就是说,我们会尝试间隔一段时间就往服务器发起一不好呼吁,在动RxJava前,该需求的兑现一般有零星种植艺术:

  • 通过Handler出殡延时音,在handleMessage倍受要服务器之后,再次发送一个延时信,直到上循环次数为止。
  • 使用Java供的定时器Timer

我们品尝用RxJava2供的操作符来贯彻即同一要求,这里演示两种植方式的轮询,并拿单次访问的次数限制以5次:

  • 稳时延:使用intervalRange操作符,每间隔3s实践同样破任务。
  • 易长时延:使用repeatWhen操作符实现,第一赖实施了任务后,等待4s双重实施第二不好任务,在其次不良任务尽到位后,等待5s,依次递增。

2.2 示例

public class PollingActivity extends AppCompatActivity {

    private static final String TAG = PollingActivity.class.getSimpleName();

    private TextView mTvSimple;
    private TextView mTvAdvance;
    private CompositeDisposable mCompositeDisposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_polling);
        mTvSimple = (TextView) findViewById(R.id.tv_simple);
        mTvSimple.setOnClickListener(new View.OnClickListener() {

            @Override
            public void onClick(View v) {
                startSimplePolling();
            }

        });
        mTvAdvance = (TextView) findViewById(R.id.tv_advance);
        mTvAdvance.setOnClickListener(new View.OnClickListener() {

            @Override
            public void onClick(View v) {
                startAdvancePolling();
            }

        });
        mCompositeDisposable = new CompositeDisposable();
    }

    private void startSimplePolling() {
        Log.d(TAG, "startSimplePolling");
        Observable<Long> observable = Observable.intervalRange(0, 5, 0, 3000, TimeUnit.MILLISECONDS).take(5).doOnNext(new Consumer<Long>() {

            @Override
            public void accept(Long aLong) throws Exception {
                doWork(); //这里使用了doOnNext,因此DisposableObserver的onNext要等到该方法执行完才会回调。
            }

        });
        DisposableObserver<Long> disposableObserver = getDisposableObserver();
        observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
        mCompositeDisposable.add(disposableObserver);
    }

    private void startAdvancePolling() {
        Log.d(TAG, "startAdvancePolling click");
        Observable<Long> observable = Observable.just(0L).doOnComplete(new Action() {

            @Override
            public void run() throws Exception {
                doWork();
            }

        }).repeatWhen(new Function<Observable<Object>, ObservableSource<Long>>() {

            private long mRepeatCount;

            @Override
            public ObservableSource<Long> apply(Observable<Object> objectObservable) throws Exception {
                //必须作出反应,这里是通过flatMap操作符。
                return objectObservable.flatMap(new Function<Object, ObservableSource<Long>>() {

                    @Override
                    public ObservableSource<Long> apply(Object o) throws Exception {
                        if (++mRepeatCount > 4) {
                            //return Observable.empty(); //发送onComplete消息,无法触发下游的onComplete回调。
                            return Observable.error(new Throwable("Polling work finished")); //发送onError消息,可以触发下游的onError回调。
                        }
                        Log.d(TAG, "startAdvancePolling apply");
                        return Observable.timer(3000 + mRepeatCount * 1000, TimeUnit.MILLISECONDS);
                    }

                });
            }

        });
        DisposableObserver<Long> disposableObserver = getDisposableObserver();
        observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
        mCompositeDisposable.add(disposableObserver);
    }

    private DisposableObserver<Long> getDisposableObserver() {

        return new DisposableObserver<Long>() {

            @Override
            public void onNext(Long aLong) {}

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "DisposableObserver onError, threadId=" + Thread.currentThread().getId() + ",reason=" + throwable.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "DisposableObserver onComplete, threadId=" + Thread.currentThread().getId());
            }
        };
    }

    private void doWork() {
        long workTime = (long) (Math.random() * 500) + 500;
        try {
            Log.d(TAG, "doWork start,  threadId=" + Thread.currentThread().getId());
            Thread.sleep(workTime);
            Log.d(TAG, "doWork finished");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        mCompositeDisposable.clear();
    }
}

startSimplePolling对诺吃稳时延轮询:

startAdvancePolling本着承诺于易长时延轮询:

其三、示例解析

脚,就让咱们一齐来分析一下端立简单独例子中提到到之知识点。

3.1 intervalRange & doOnNext 实现稳定时延轮询

对一贯时延轮询的需,采用的凡intervalRange的法来落实,它是一个创建型操作符,该Observable率先破先放一个特定的多寡,之后距离一段时间再发送一赖,它是intervalrange的结合体,这点儿只操作符的原理图为:

interval 原理图

range 原理图

欠操作符的优势在:

  • interval对照,它可指定第一只发送数据项之时延、指定发送数据项之个数。
  • range相比,它可指定两项数据中发送的时延。

intervalRange的收取参数的义为:

  • start:发送数据的开场值,为Long型。
  • count:总共发送小件数据。
  • initialDelay:发送第一只数据项时的前奏时延。
  • period:两件数据中的间隔时间。
  • TimeUnit:时间单位。

每当轮询操作着一般会进展有耗时的纱要,因此我们摘以doOnNext开展处理,它见面在下游的onNext方给回调之前调用,但是其的周转线程可以经过subscribeOn点名,下游的运转线程再经过observerOn切换会主线程,通过打印对应的线程ID足证明结果。

当求的数额项都发送了后,最后会回调onComplete方法。

3.2 repeatWhen 实现变长时延轮询

3.2.1 使用 repeatWhen 实现又订阅

因此得以经过repeatWhen来贯彻轮询,是以其吧咱提供了重订阅的效用,而还订阅有星星点点碰元素:

  • 上游告诉我们一样不好订阅已经做到,这就是得上游回调onComplete函数。
  • 俺们报告上游是否需要还订阅,通过repeatWhenFunction函数所返的Observable确定,如果该Observable发送了onComplete或者onError虽然代表未需再订阅,结束所有工艺流程;否则触发重订阅的操作。

该规律图如下所示:

repeatWhen 原理图

repeatWhen的难处在于安定义其的Function参数:

  • Function的输入是一个Observable<Object>,输出是一个泛型ObservableSource<?>
  • 如若出口的Observable发送了onComplete或者onError尽管代表不需要更订阅,结束所有工艺流程;否则触发重订阅的操作。也就是说,它
    但是当一个是不是如触发重订阅的通onNext发送的凡啊数据并无重大。
  • 对于各一样坏订阅的多少流 Function
    函数仅仅见面回调一次
    ,并且是以onComplete的时候接触,它不会见接收任何的onNext事件。
  • Function函数中,须对输入的
    Observable<Object>进行处理
    ,这里我们采取的是flatMap操作符接收上游的多寡,对于flatMap的诠释,大家可参考
    RxJava2 实战知识梳理(4) – 结合 Retrofit
    请求新闻资讯 。

如果当我们无待还订阅时,有点儿种方法:

  • 返回Observable.empty(),发送onComplete消息,但是DisposableObserver连无会见回调onComplete
  • 返回Observable.error(new Throwable("Polling work finished"))DisposableObserveronError会于回调,并受传过去的错误信息。

3.2.2 使用 Timer 实现两差订阅之间的时延

以上就是对repeatWhen的解释,与repeatWhen交互仿佛的还有retryWhen操作符,这个我们在生同样首稿子中重复介绍,接下去,我们看一下什么落实两潮事件之时延。

面前我们解析了,重订阅触发之光阴是当返回的ObservableSource发送了onNext事件后,那么我们通过该ObservableSource延殡葬一个事件就可以实现相应的急需,这里以的凡time操作符,它的原理图如下所示,也便是,在订阅完成后,等待指定的时刻她才会发送信息。

timer 原理图

3.2.3 使用 doOnComplete 完成轮询的耗时操作

是因为当订阅完成时会发送onComplete消息,那么我们就算足以当doOnComplete饱受展开轮询所而拓展的具体操作,它所运行的线程通过subscribeOn指定。


又多篇,欢迎访问我的 Android 知识梳理系列:

  • Android
    知识梳理目录:http://www.jianshu.com/p/fd82d18994ce
  • 个人主页:http://lizejun.cn
  • 民用知识总结目录:http://lizejun.cn/categories/
网站地图xml地图