RxJava2 实战系列文章

RxJava2 实战知识梳理(1) – 后台执行耗时操作,实时通报 UI
更新
RxJava2 实战知识梳理(2) –
计算一段时间内数据的平均值
RxJava2 实战知识梳理(3) –
优化搜索联想功能
RxJava2 实战知识梳理(4) – 结合 Retrofit
请求新闻消息
RxJava2 实战知识梳理(5) –
简单和进阶的轮询操作
RxJava2 实战知识梳理(6) –
基于左类型的重试请求
RxJava2 实战知识梳理(7) – 基于 combineLatest
实现之输入表单验证
RxJava2 实战知识梳理(8) – 使用 publish + merge
优化先加载缓存,再念博网络数据的伸手过程
RxJava2 实战知识梳理(9) – 使用 timer/interval/delay
实现任务调度
RxJava2 实战知识梳理(10) – 屏幕旋转导致 Activity
重建时回升任务
RxJava2 实战知识梳理(11) –
检测网络状态并自动重试请求
RxJava2 实战知识梳理(12) – 实战讲解 publish & replay & share & refCount
& autoConnect
RxJava2 实战知识梳理(13) –
如何让错误产生常无自动终止订阅关系
RxJava2 实战知识梳理(14) – 在 token 过期时,刷新过期 token
并重新发起呼吁
RxJava2 实战知识梳理(15) – 实现一个简易的 MVP + RxJava + Retrofit
应用


一、前言

今,我们来收拾以下几独大家好下手瞎的概念,并为此实际例子来演示,可以从
RxSample
的第十二回中取得:

  • publish
  • reply
  • ConnectableObservable
  • connect
  • share
  • refCount
  • autoConnect

对此上述这些概念,可以用同一幅图来概括:

新濠娱乐 1

于图备受可以视,这里面可以供使用者订阅的Observable好分为四类,下面我们用依次介绍就几乎种植Observable的特点:

  • 第一类:Cold Observable,就是咱由此Observable.createObservable.interval对等创建型操作符生成的Observable
  • 第二类:由Cold Observable经过publish()或者replay(int N)操作符转换成的ConnectableObservable
  • 第三类:由ConnectableObservable经过refCount(),或者由Cold Observable经过share()易成的Observable
  • 第四类:由ConnectableObservable经过autoConnect(int N)变成为的Observable

二、Cold Observable

Cold Observable虽是咱们由此Observable.createObservable.interval抵创建型操作符生成的Observable,它抱有以下几只特点:

  • 当一个订阅者订阅Cold Observable时,Cold Observable会再次开放数量被该订阅者。
  • 当多只订阅者订阅到同一个Cold Observable,它们收到的数码是互相独立的。
  • 当一个订阅者取消订阅Cold Observable后,Cold Observable会晤已发射数量为该订阅者,但不会见告一段落发射数量给任何订阅者。

下,我们演示一个例,首先我们创建一个Cold Observable

    //直接订阅Cold Observable。
    private void createColdSource() {
        mConvertObservable = getSource();
    }

    private Observable<Integer> getSource() {
        return Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                try {
                    int i = 0;
                    while (true) {
                        Log.d(TAG, "源被订阅者发射数据=" + i + ",发送线程ID=" + Thread.currentThread().getId());
                        mSourceOut.add(i);
                        observableEmitter.onNext(i++);
                        updateMessage();
                        Thread.sleep(1000);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).subscribeOn(Schedulers.io());
    }

每当开立两个订阅者,它们可以天天订阅到Cold Observable要么吊销针对它的订阅:

    private void startSubscribe1() {
        if (mConvertObservable != null && mDisposable1 == null) {
            mDisposable1 = mConvertObservable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "订阅者1收到数据=" + integer + ",接收线程ID=" + Thread.currentThread().getId());
                    mSubscribe1In.add(integer);
                    updateMessage();
                }
            });
        }
    }

    private void disposeSubscribe1() {
        if (mDisposable1 != null) {
            mDisposable1.dispose();
            mDisposable1 = null;
            mSubscribe1In.clear();
            updateMessage();
        }
    }

    private void startSubscribe2() {
        if (mConvertObservable != null && mDisposable2 == null) {
            mDisposable2 = mConvertObservable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "订阅者2收到数据=" + integer + ",接收线程ID=" + Thread.currentThread().getId());
                    mSubscribe2In.add(integer);
                    updateMessage();
                }
            });
        }
    }

    private void disposeSubscribe2() {
        if (mDisposable2 != null) {
            mDisposable2.dispose();
            mDisposable2 = null;
            mSubscribe2In.clear();
            updateMessage();
        }
    }

为证明之前说到之几个性状,进入次以后,我们见面预先创造该Cold Observable,之后进展相同名目繁多之操作,效果如下:

新濠娱乐 2

每当上面的图中,我们召开了一晃几乎步操作:

  • 第一步:启动以,创建Cold Observable,这时候Cold Observable从来不发送任何数。
  • 第二步:Observer1订阅Observable,此时Cold Observable开班发送数据,Observer1也可接过多少,即
    一个订阅者订阅 Cold Observable 时, Cold Observable
    会开始放数量被该订阅者
  • 第三步:Observer2订阅Observable,此时Observable2否得接纳数量,但是她同Observable1收纳的多寡是互相独立的,即
    当多只订阅者订阅到同一个 Cold Observable
    ,它们收到的数目是互独立的
  • 第四步:Observer1取消对Observable的订阅,这时候Observer1完不顶数量,并且Observable啊非会见放数量为其,但是仍然会放数量给Observer2,即
    当一个订阅者取消订阅 Cold Observable 后,Cold Observable
    会停止发射数量为该订阅者,但无会见告一段落发射数量被其他订阅者
  • 第五步:Observer1再订阅Observable,这时候Observable0开放数量让Observer1,即
    一个订阅者订阅 Cold Observable 时, Cold Observable
    会重新开放数量被该订阅者

三、由 Cold Observable 转换的 ConnectableObservable

当打听了Cold Observable下,我们重来拘禁第二类似的Observable,它的档次为ConnectableObservable,它是经过Cold Observable经下面两种植艺术转的:

  • .publish()
  • .reply(int N)

若应用.publish()始建,那么订阅者只能接收在订阅之后Cold Observable起之数,而设运用reply(int N)创建,那么订阅者在订阅后可收Cold Observable以订阅之前发送的N个数据。

咱俩先行以publish()为例,介绍ConnectableObservable的几乎独特点:

  • 无论ConnectableObservable产生没有来订阅者,只要调用了ConnectableObservableconnect方法,Cold Observable虽开发送数据。
  • connect会晤回到一个Disposable目标,调用了该目标的dispose方法,Cold Observable将会晤告一段落发送数据,所有ConnectableObservable的订阅者也无从接受数额。
  • 在调用connect返回的Disposable靶后,如果重新调用了connect方法,那么Cold Observable会面又发送数据。
  • 当一个订阅者订阅到ConnectableObservable晚,该订阅者会收下在订阅之后,Cold Observable发送给ConnectableObservable的数据。
  • 当多个订阅者订阅到和一个ConnectableObservable时不时,它们收到的多少是如出一辙的。
  • 当一个订阅者取消对ConnectableObservable,不见面影响其他订阅者收到信息。

下面,我们创建一个ConnectableObservable,两个订阅者之后会订阅到它,而未是Cold Observable

    //.publish()将源Observable转换成为HotObservable,当调用它的connect方法后,无论此时有没有订阅者,源Observable都开始发送数据,订阅者订阅后将可以收到数据,并且订阅者解除订阅不会影响源Observable数据的发射。
    public void createPublishSource() {
        mColdObservable = getSource();
        mConvertObservable = mColdObservable.publish();
        mConvertDisposable = ((ConnectableObservable<Integer>) mConvertObservable).connect();
    }

及点一样,还是用一个例子来演示,该例子的法力也:

新濠娱乐 3

  • 首先步:启动以,通过Cold Observablepublish艺术创建ConnectableObservable,并调用ConnectableObservableconnect方式,可以看出,此时虽然ConnectableObservable不曾其他订阅者,但是Cold Observable否已起来发送数据。
  • 第二步:Observer1订阅到ConnectableObservable,此时其只能吸收订阅之后Cold Observable发射的数据。
  • 第三步:Observer2订阅到ConnectableObservableCold Observable独自见面发出一卖数据,并且Observer1Observer2收下的数目是一样的。
  • 第三步:Observer1取消对ConnectableObservable的订阅,Cold Observable依然会放数量,Observer2依旧可以收起Cold Observable放的多少。
  • 第四步:Observer1重复订阅ConnectableObservable,和老三步相同,它还是单纯会收到订阅之后Cold Observable发射的数量。
  • 第五步:通过connect返回的Disposable对象,调用dispose方法,此时Cold Observable停下发射数量,并且Observer1Observer2犹结束不交数码。

地方这些现象来的根本原因在于:现在ObserverObserver2都是订阅到ConnectableObservable,真正来多少的Cold Observable并不知道他们之在,和它相的凡ConnectableObservableConnectableObservable一定给一个中介,它形成下面两桩职责:

  • 对此上游:通过connectdispose主意决定是否要订阅到Cold Observer,也不怕是决定了Cold Observable是不是发送数据。
  • 对此下游:将Cold Observable出殡的数量转交给她的订阅者。

四、由 ConnectableObservable 转换成 Observable

ConnectableObservable转换成Observable来星星点点种植艺术,我们分为两节介绍下当订阅到转换后的Observable时不时之景象:

  • .refCount()
  • .autoConnect(int N)

4.1 ConnectableObservable 由 refCount 转换成 Observable

经过refCount方法,ConnectableObservable足变换成为健康的Observable,我们誉为refObservable,这里我们如果ConnectableObservable是由Cold Observable通过publish()法易的,对于其的订阅者,有以下几个特征:

  • 先是个订阅者订阅到refObservable后,Cold Observable发端发送数据。
  • 随后的订阅者订阅到refObservable晚,只能吸收在订阅之后Cold Observable发送的数据。
  • 而一个订阅者取消订阅到refObservable继,假如它是眼前refObservable的唯一一个订阅者,那么Cold Observable会告一段落发送数据;否则,Cold Observable照例会连续发送数据,其它的订阅者仍然可吸纳Cold Observable出殡的多寡。

跟着上例子,我们创建一个refObservable

    //.share()相当于.publish().refCount(),当有订阅者订阅时,源订阅者会开始发送数据,如果所有的订阅者都取消订阅,源Observable就会停止发送数据。
    private void createShareSource() {
        mColdObservable = getSource();
        mConvertObservable = mColdObservable.publish().refCount();
    }

以身作则如下:

新濠娱乐 4

操作分为以下几步:

  • 第一步:通过.publish().refCount()创建由ConnectableObservable换后的refObservable,此时Cold Observable没发送任何消息。
  • 第二步:Observer1订阅到refObservableCold Observable起来发送数据,Observer1接收数据。
  • 第三步:Observer2订阅到refObservable,它只能吸收以订阅之后Cold Observable出殡的多寡。
  • 第四步:Observer1撤订阅,Cold Observable累发送数据,Observer2反之亦然能够收数量。
  • 第五步:Observer2撤销订阅,Cold Observable终止发送数据。
  • 第六步:Observer1又订阅,Cold Observable重开始发送数据。

最后验明正身某些:订阅到Cold Observable.publish().refCount()Cold Observableshare()所返的Observable凡是相当价格的。

4.2 ConnectableObservable 由 autoConnect(int N) 转换成 Observable

autoConnect(int N)refCount很类似,都是将ConnectableObservable改换成为一般的Observable,我们叫autoObservable,同样我们先假设ConnectableObservable是由Cold Observable通过publish()主意变的,它起以下几只特征:

  • 当有N独订阅者订阅到refObservable后,Cold Observable开发送数据。
  • 今后的订阅者订阅到refObservable继,只能吸收在订阅之后Cold Observable发送的多少。
  • 只要Cold Observable初始发送数据,即使有的autoObservable的订阅和还取消了订阅,Cold Observable为非会见告一段落发送数据,如果想使Cold Observable停止发送数据,那么可用autoConnect(int numberOfSubscribers, Consumer connection)Consumer返回的Disposable,它的企图和ConnectableObservableconnect方返回的Disposable相同。

彼创建方法如下所示:

    //.autoConnect在有指定个订阅者时开始让源Observable发送消息,但是订阅者是否取消订阅不会影响到源Observable的发射。
    private void createAutoConnectSource() {
        mColdObservable = getSource();
        mConvertObservable = mColdObservable.publish().autoConnect(1, new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                mConvertDisposable = disposable;
            }
        });
    }

以身作则效果如下:

新濠娱乐 5

我们进行了如下几步操作:

  • 率先步:启动以,创建autoConnect改换后底autoObservable
  • 第二步:Observer1订阅到autoObservable,此时满足条件,Cold Observable初始发送数据。
  • 第三步:Observer2订阅到autoObservable,它只能接收订阅后发的多少。
  • 第四步:Observer1撤订阅,Cold Observable延续发送数据,Observer2还可接纳数额。
  • 第五步:Observer2撤销订阅,Cold Observable还是此起彼伏发送数据。
  • 第六步:Observer2订阅到autoObservable,它不得不接收订阅后发送的信了。
  • 第七步:调用mConvertDisposabledisposeCold Observable悬停发送数据。

五、publish 和 reply(int N) 的区别

当上面的事例中,所有总结的风味都是树立以ConnectableObservable是由publish()变化,只所以这么做,是以方便大家领略,无论是订阅到ConnectableObservable,还是由ConnectableObservable转换的refObservableautoObservable,使用即时简单种方式创造的绝无仅有区别就是是,订阅者在订阅后,如果是经过publish()缔造的,那么订阅者之后接订阅后Cold Observable发送的数据;而要是reply(int N)创立的,那么订阅者还能额外收取N个之前Cold Observable出殡的数码,我们之所以脚一个粗例子来演示,订阅者订阅到之Observable如下:

    //.reply会让缓存源Observable的N个数据项,当有新的订阅者订阅时,它会发送这N个数据项给它。
    private void createReplySource() {
        mColdObservable = getSource();
        mConvertObservable = mColdObservable.replay(3);
        mConvertDisposable = ((ConnectableObservable<Integer>) mConvertObservable).connect();
    }

演示演示效果:

新濠娱乐 6

操作步骤:

  • 第一步:启动以,通过Cold Observablereplay(3)术创建ConnectableObservable,可以望,此时虽ConnectableObservable无另外订阅者,但是Cold Observable也已初步发送数据。
  • 第二步:Observer1订阅到ConnectableObservable,此时它们会预先接到之前发射的3单数据,之后接到订阅之后Cold Observable发出的多寡。

末段还取一下,更详尽的代码大家好从
RxSample
的第十二章中收获。


再度多篇,欢迎访问我之 Android 知识梳理系列:

  • Android
    知识梳理目录:http://www.jianshu.com/p/fd82d18994ce
  • 个人主页:http://lizejun.cn
  • 民用知识总结目录:http://lizejun.cn/categories/
网站地图xml地图