GL450xJava2 实战类别小说

MuranoxJava2 实战知识梳理(1) – 后台执行耗费时间操作,实时通报 UI
更新

猎豹CS6xJava2 实战知识梳理(2) –
总结一段时间内数据的平均值

TiguanxJava2 实战知识梳理(3) –
优化搜索联想功效

XC90xJava2 实战知识梳理(4) – 结合 Retrofit
请求新闻资源音信

LacrossexJava2 实战知识梳理(5) –
不难及进阶的轮询操作

酷威xJava2 实战知识梳理(6) –
基于错误类型的重试请求

XC60xJava2 实战知识梳理(7) – 基于 combineLatest
完结的输入表单验证

帕杰罗xJava2 实战知识梳理(8) – 使用 publish + merge
优化先加载缓存,再读取互联网数据的伏乞进程

福睿斯xJava2 实战知识梳理(9) – 使用 timer/interval/delay
达成任务调度

科雷傲xJava2 实战知识梳理(10) – 荧屏旋转导致 Activity
重建时上升职分

索罗德xJava2 实战知识梳理(11) –
检测网络状态并活动重试请求

LacrossexJava2 实战知识梳理(12) – 实战讲解 publish & replay & share & refCount
& autoConnect

RAV4xJava2 实战知识梳理(13) –
怎么着使得错误发生时不自动结束订阅关系

奥迪Q7xJava2 实战知识梳理(14) – 在 token 过期时,刷新过期 token
相提并论新发起呼吁

LacrossexJava2 实战知识梳理(15) – 达成一个简便的 MVP + 兰德酷路泽xJava + Retrofit
应用


一、前言

前些天,我们来整治以下多少个我们简单弄混的定义,并用实际例子来演示,可以从
RxSample
的第玖二章中收获:

  • publish
  • reply
  • ConnectableObservable
  • connect
  • share
  • refCount
  • autoConnect

对于以上那些概念,能够用一幅图来总结:

新濠娱乐 1

从图中能够看来,那其间能够供使用者订阅的Observable能够分为四类,上边大家将次第介绍那二种Observable的特点:

  • 第一类:Cold Observable,正是我们因此Observable.createObservable.interval等创建型操作符生成的Observable
  • 第二类:由Cold Observable经过publish()或者replay(int N)操作符转换到的ConnectableObservable
  • 第三类:由ConnectableObservable经过refCount(),或者由Cold Observable经过share()转换到的Observable
  • 第四类:由ConnectableObservable经过autoConnect(int N)转换来的Observable

二、Cold Observable

Cold Observable便是大家透过Observable.createObservable.interval等制造型操作符生成的Observable,它抱有以下几个特点:

  • 当二个订阅者订阅Cold Observable时,Cold Observable会再次开首发出数量给该订阅者。
  • 当多个订阅者订阅到同1个Cold Observable,它们收到的数额是相互独立的。
  • 当贰个订阅者打消订阅Cold Observable后,Cold Observable会告一段落发射数量给该订阅者,但不会甘休发射数量给别的订阅者。

上边,大家演示一个例证,首先我们创建3个Cold Observable

    //直接订阅Cold Observable。
    private void createColdSource() {
        mConvertObservable = getSource();
    }

    private Observable<Integer> getSource() {
        return Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                try {
                    int i = 0;
                    while (true) {
                        Log.d(TAG, "源被订阅者发射数据=" + i + ",发送线程ID=" + Thread.currentThread().getId());
                        mSourceOut.add(i);
                        observableEmitter.onNext(i++);
                        updateMessage();
                        Thread.sleep(1000);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).subscribeOn(Schedulers.io());
    }

在开创八个订阅者,它们得以每天订阅到Cold Observable抑或吊销对它的订阅:

    private void startSubscribe1() {
        if (mConvertObservable != null && mDisposable1 == null) {
            mDisposable1 = mConvertObservable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "订阅者1收到数据=" + integer + ",接收线程ID=" + Thread.currentThread().getId());
                    mSubscribe1In.add(integer);
                    updateMessage();
                }
            });
        }
    }

    private void disposeSubscribe1() {
        if (mDisposable1 != null) {
            mDisposable1.dispose();
            mDisposable1 = null;
            mSubscribe1In.clear();
            updateMessage();
        }
    }

    private void startSubscribe2() {
        if (mConvertObservable != null && mDisposable2 == null) {
            mDisposable2 = mConvertObservable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "订阅者2收到数据=" + integer + ",接收线程ID=" + Thread.currentThread().getId());
                    mSubscribe2In.add(integer);
                    updateMessage();
                }
            });
        }
    }

    private void disposeSubscribe2() {
        if (mDisposable2 != null) {
            mDisposable2.dispose();
            mDisposable2 = null;
            mSubscribe2In.clear();
            updateMessage();
        }
    }

为了表达以前说到的几本本性,进入程序之后,我们会先创立该Cold Observable,之后展开一多级的操作,效果如下:

新濠娱乐 2

在上头的图中,我们做了一下几步操作:

  • 率先步:运转应用,创设Cold Observable,这时候Cold Observable尚无发送任何数据。
  • 第二步:Observer1订阅Observable,此时Cold Observable始于发送数据,Observer1也得以接到数额,即
    一个订阅者订阅 Cold Observable 时, Cold Observable
    会开端发出数量给该订阅者
  • 第三步:Observer2订阅Observable,此时Observable2也能够吸收接纳数量,不过它和Observable1吸收的数目是相互独立的,即
    当七个订阅者订阅到同两个 Cold Observable
    ,它们收到的多少是并行独立的
  • 第四步:Observer1取消对Observable的订阅,这时候Observer1收不到数据,并且Observable也不会发出数量给它,不过照旧会发出数量给Observer2,即
    当贰个订阅者撤销订阅 Cold Observable 后,Cold Observable
    会结束发射数量给该订阅者,但不会甘休发射数量给别的订阅者
  • 第五步:Observer1再次订阅Observable,这时候Observable0初始发出数量给Observer1,即
    三个订阅者订阅 Cold Observable 时, Cold Observable
    会重新开头阵出数量给该订阅者

三、由 Cold Observable 转换的 ConnectableObservable

在摸底完Cold Observable事后,我们再来看第1类的Observable,它的品种为ConnectableObservable,它是由此Cold Observable经过上面三种办法变通的:

  • .publish()
  • .reply(int N)

一旦采用.publish()创立,那么订阅者只可以收到在订阅之后Cold Observable产生的数额,而一旦运用reply(int N)成立,那么订阅者在订阅后得以接受Cold Observable在订阅从前发送的N个数据。

小编们先以publish()为例,介绍ConnectableObservable的几性格形:

  • 无论ConnectableObservable有没有订阅者,只要调用了ConnectableObservableconnect方法,Cold Observable就早首发送数据。
  • connect会重临贰个Disposable对象,调用了该对象的dispose方法,Cold Observable将会甘休发送数据,全部ConnectableObservable的订阅者也不能接收数额。
  • 在调用connect返回的Disposable对象后,尽管再一次调用了connect方法,那么Cold Observable会重新发送数据。
  • 当二个订阅者订阅到ConnectableObservable后,该订阅者会收下在订阅之后,Cold Observable发送给ConnectableObservable的数据。
  • 当五个订阅者订阅到同1个ConnectableObservable时,它们收到的数量是相同的。
  • 当三个订阅者撤消对ConnectableObservable,不会影响别的订阅者收到消息。

下边,我们创立二个ConnectableObservable,多少个订阅者之后会订阅到它,而不是Cold Observable

    //.publish()将源Observable转换成为HotObservable,当调用它的connect方法后,无论此时有没有订阅者,源Observable都开始发送数据,订阅者订阅后将可以收到数据,并且订阅者解除订阅不会影响源Observable数据的发射。
    public void createPublishSource() {
        mColdObservable = getSource();
        mConvertObservable = mColdObservable.publish();
        mConvertDisposable = ((ConnectableObservable<Integer>) mConvertObservable).connect();
    }

和地方一样,还是用多个事例来演示,该例子的效果为:

新濠娱乐 3

  • 先是步:运维应用,通过Cold Observablepublish格局创设ConnectableObservable,并调用ConnectableObservableconnect办法,能够看看,此时虽说ConnectableObservable没有其余订阅者,然而Cold Observable也早已上马发送数据。
  • 第二步:Observer1订阅到ConnectableObservable,此时它不得不收到订阅之后Cold Observable发出的数据。
  • 第三步:Observer2订阅到ConnectableObservableCold Observable只会发出一份数据,并且Observer1Observer2接受的多少是一模一样的。
  • 第三步:Observer1取消对ConnectableObservable的订阅,Cold Observable一如既往会发出数量,Observer2依旧能够吸纳Cold Observable发射的数额。
  • 第四步:Observer1双重订阅ConnectableObservable,和第二步相同,它仍然只会接到订阅之后Cold Observable发出的数额。
  • 第五步:通过connect返回的Disposable对象,调用dispose方法,此时Cold Observable悬停发射数量,并且Observer1Observer2都收不到数据。

下面那一个现象发生的根本原因在于:未来ObserverObserver2都是订阅到ConnectableObservable,真正发生多少的Cold Observable并不知道他们的存在,和它交互的是ConnectableObservableConnectableObservable一定于三个中介,它形成上边两项职分:

  • 对于上游:通过connectdispose措施决定是或不是要订阅到Cold Observer,也便是决定了Cold Observable是否发送数据。
  • 对此下游:将Cold Observable出殡的数据转交给它的订阅者。

四、由 ConnectableObservable 转换成 Observable

ConnectableObservable转换成Observable有三种格局,我们分为两节介绍下当订阅到转换后的Observable时的光景:

  • .refCount()
  • .autoConnect(int N)

4.1 ConnectableObservable 由 refCount 转换成 Observable

经过refCount方法,ConnectableObservable能够转换来符合规律的Observable,我们称为refObservable,那里大家假若ConnectableObservable是由Cold Observable通过publish()措施转换的,对于它的订阅者,有以下几个特点:

  • 第②个订阅者订阅到refObservable后,Cold Observable开班发送数据。
  • 未来的订阅者订阅到refObservable后,只可以收到在订阅之后Cold Observable发送的多少。
  • 假诺七个订阅者打消订阅到refObservable后,若是它是时下refObservable的唯一三个订阅者,那么Cold Observable会停下发送数据;否则,Cold Observable依旧会一而再发送数据,别的的订阅者仍旧可以吸收接纳Cold Observable发送的数量。

继而上例子,大家创设贰个refObservable

    //.share()相当于.publish().refCount(),当有订阅者订阅时,源订阅者会开始发送数据,如果所有的订阅者都取消订阅,源Observable就会停止发送数据。
    private void createShareSource() {
        mColdObservable = getSource();
        mConvertObservable = mColdObservable.publish().refCount();
    }

以身作则如下:

新濠娱乐 4

操作分为以下几步:

  • 第一步:通过.publish().refCount()创建由ConnectableObservable更换后的refObservable,此时Cold Observable没有发送任何新闻。
  • 第二步:Observer1订阅到refObservableCold Observable发端发送数据,Observer1接收数据。
  • 第三步:Observer2订阅到refObservable,它不得不接受在订阅之后Cold Observable出殡的数目。
  • 第四步:Observer1收回订阅,Cold Observable一而再发送数据,Observer2还能够吸收接纳数量。
  • 第五步:Observer2撤销订阅,Cold Observable悬停发送数据。
  • 第六步:Observer1再度订阅,Cold Observable双重开头发送数据。

最终验明正身一些:订阅到Cold Observable.publish().refCount()Cold Observableshare()所再次来到的Observable是等价的。

4.2 ConnectableObservable 由 autoConnect(int N) 转换成 Observable

autoConnect(int N)refCount很类似,都是将ConnectableObservable转换来普通的Observable,大家誉为autoObservable,同样大家先若是ConnectableObservable是由Cold Observable通过publish()办法生成的,它有以下几脾特性:

  • 新濠娱乐,当有N个订阅者订阅到refObservable后,Cold Observable开班发送数据。
  • 事后的订阅者订阅到refObservable后,只好收到在订阅之后Cold Observable发送的数码。
  • 只要Cold Observable起首发送数据,尽管拥有的autoObservable的订阅和都收回了订阅,Cold Observable也不会终止发送数据,假若想要Cold Observable结束发送数据,那么能够行使autoConnect(int numberOfSubscribers, Consumer connection)Consumer返回的Disposable,它的效果和ConnectableObservableconnect措施重临的Disposable相同。

其创建方法如下所示:

    //.autoConnect在有指定个订阅者时开始让源Observable发送消息,但是订阅者是否取消订阅不会影响到源Observable的发射。
    private void createAutoConnectSource() {
        mColdObservable = getSource();
        mConvertObservable = mColdObservable.publish().autoConnect(1, new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                mConvertDisposable = disposable;
            }
        });
    }

示范效果如下:

新濠娱乐 5

我们开始展览了之类几步操作:

  • 率先步:运转应用,创立autoConnect改换后的autoObservable
  • 第二步:Observer1订阅到autoObservable,此时满足条件,Cold Observable开始发送数据。
  • 第三步:Observer2订阅到autoObservable,它不得不接收订阅后产生的数码。
  • 第四步:Observer1撤消订阅,Cold Observable此起彼伏发送数据,Observer2照例能够接到数额。
  • 第五步:Observer2撤销订阅,Cold Observable照例三番5遍发送数据。
  • 第六步:Observer2订阅到autoObservable,它只可以收取订阅后发送的消息了。
  • 第七步:调用mConvertDisposabledisposeCold Observable停下发送数据。

五、publish 和 reply(int N) 的区别

在上头的例子个中,全体总括的特色都以建立在ConnectableObservable是由publish()变化,只所以这么做,是为着便于大家明白,无论是订阅到ConnectableObservable,还是由ConnectableObservable转换的refObservableautoObservable,使用那二种方法创制的绝无仅有分化便是,订阅者在订阅后,假设是因此publish()制造的,那么订阅者之后接到订阅后Cold Observable出殡的数量;而一旦是reply(int N)创建的,那么订阅者还是能额外收取N个之前Cold Observable发送的数码,大家用上面一个小例子来演示,订阅者订阅到的Observable如下:

    //.reply会让缓存源Observable的N个数据项,当有新的订阅者订阅时,它会发送这N个数据项给它。
    private void createReplySource() {
        mColdObservable = getSource();
        mConvertObservable = mColdObservable.replay(3);
        mConvertDisposable = ((ConnectableObservable<Integer>) mConvertObservable).connect();
    }

示范演示效果:

新濠娱乐 6

操作步骤:

  • 率先步:运行应用,通过Cold Observablereplay(3)办法创制ConnectableObservable,能够观看,此时虽说ConnectableObservable从未有过其它订阅者,不过Cold Observable也已经上马发送数据。
  • 第二步:Observer1订阅到ConnectableObservable,此时它会先接到以前发射的3个数据,之后接到订阅之后Cold Observable发射的多寡。

最后再提一下,更详细的代码大家能够从
RxSample
的第8二章中取得。


更加多作品,欢迎访问作者的 Android 知识梳理种类:

网站地图xml地图