2021年6月份居民消费价格同比上涨1.1% 环比下降0.4%

  2021年6月份,全国居民消费价格同比上涨1.1%。其中,城市上涨1.2%,农村上涨0.7%;食品价格下降1.7%,非食品价格上涨1.7%;消费品价格上涨1.1%,服务价格上涨1.0%。上半年,全国居民消费价格比去年同期上涨0.5%。   6月份,全国居民消费价格环比下降0.4%。其中,城市下降0.4%,农村下降0.5%;食品价格下降2.2%,非食品价格持平;消费品价格下降0.6%,服务价格下降0.1%。    一、各类商品及服务价格同比变动情况   6月份,食品烟酒类价格同比下降0.4%,影响CPI(居民消费价格指数)下降约0.12个百分点。食品中,畜肉类价格下降19.5%,影响CPI下降约0.81个百分点,其中猪肉价格下降36.5%,影响CPI下降约0.80个百分点;蛋类价格上涨17.9%,影响CPI上涨约0.10个百分点;水产品价格上涨14.2%,影响CPI上涨约0.26个百分点;鲜果价格上涨3.1%,影响CPI上涨约0.06个百分点;粮食价格上涨0.7%,影响CPI上涨约0.01个百分点。     其他七大类价格同比六涨一降。其中,交通通信、教育文化娱乐、居住价格分别上涨5.8%、1.5%和0.9%,衣着、生活用品及服务、医疗保健价格分别上涨0.4%、0.3%和0.3%;其他用品及服务价格下降0.9%。    二、各类商品及服务价格环比变动情况   6月份,食品烟酒类价格环比下降1.4%,影响CPI下降约0.39个百分点。食品中,畜肉类价格下降7.6%,影响CPI下降约0.27个百分点,其中猪肉价格下降13.6%,影响CPI下降约0.22个百分点;鲜果价格下降4.5%,影响CPI下降约0.08个百分点;鲜菜价格下降2.3%,影响CPI下降约0.04个百分点。      其他七大类价格环比三涨一平三降。其中,居住、交通通信、其他用品及服务价格均上涨0.1%;医疗保健价格持平;衣着、生活用品及服务、教育文化娱乐价格均下降0.2%。  2021年6月份居民消费价格主要数据    环比涨跌幅 (%) 同比涨跌幅 (%) 上半年 同比涨跌幅(%) 居民消费价格 -0.4 1.1 0.5   其中:城市 -0.4 1.2 0.6      农村 -0.5 0.7 0.4   其中:食品 -2.2 -1.7 -0.2      非食品 0.0 1.7 0.7   其中:消费品 -0.6 1.1 0.7      服务 -0.1 1.0 0.3   其中:不包括食品和能源 -0.1 0.9 0.4 按类别分       一、食品烟酒 -1.4 -0.4 0.4   粮  食 0.0 0.7 1.2   食 用 油 0.2 7.7 7.1   鲜  菜 -2.3 0.1 3.2   畜 肉 类 -7.6 -19.5 -9.5    其中:猪  肉 -13.6 -36.5 -19.3       牛  肉 -0.3 4.8 4.1       羊  肉 -1.8 7.3 8.1   水 产 品 0.1 14.2 8.9   蛋  类 -0.6 17.9 7.4   奶  类 0.1 2.2 2.0   鲜  果 -4.5 3.1 2.6   卷  烟 0.2 1.1 0.9   酒  类 0.3 2.7 2.0 二、衣着 -0.2 0.4 0.0   服  装 -0.2 0.5 0.1   鞋  类 -0.4 -0.1 -0.3 三、居住 0.1 0.9 0.2   租赁房房租 0.2 0.7 0.0   水电燃料 -0.2 1.3 0.6 四、生活用品及服务 -0.2 0.3 0.1   家用器具 -0.3 1.0 0.2   家庭服务 0.1 2.6 2.4 五、交通通信 0.1 5.8 1.9   交通工具 -0.3 -0.8 -1.5   交通工具用燃料 2.0 23.6 8.2   交通工具使用和维修 0.0 1.3 1.5   通信工具 -0.2 6.2 5.8   通信服务 -0.1 -0.3 -0.3   邮递服务 0.0 0.0 -0.3 六、教育文化娱乐 -0.2 1.5 0.9   教育服务 0.1 1.9 1.8   旅  游 -2.0 -1.1 -4.3 七、医疗保健 0.0 0.3 0.3   中  药 0.2 1.5 1.5   西  药 0.0 -0.8 -1.5   医疗服务 0.0 0.7 0.7 八、其他用品及服务 0.1 -0.9 -1.1    附注   1.指标解释   居民消费价格指数(Consumer Price Index,简称CPI)是度量居民生活消费品和服务价格水平随着时间变动的相对数,综合反映居民购买的生活消费品和服务价格水平的变动情况。   2.统计范围   居民消费价格统计调查涵盖全国城乡居民生活消费的食品烟酒、衣着、居住、生活用品及服务、交通通信、教育文化娱乐、医疗保健、其他用品及服务等8大类、268个基本分类的商品与服务价格。   3.调查方法   采用抽样调查方法抽选确定调查网点,按照“定人、定点、定时”的原则,直接派人到调查网点或从互联网采集原始价格。数据来源于全国31个省(区、市)约500个市县、近10万家价格调查点,包括商场(店)、超市、农贸市场、服务网点和互联网电商等。   4.数据说明   由于“四舍五入”原因,有时会出现合计数据与分类数据高值或低值相同的情况。   5.基期轮换   按照统计制度规定,我国CPI每五年进行一次基期轮换,2021年1月开始编制和发布以2020年为基期的CPI。与上轮基期相比,新基期的调查分类目录、代表规格品和调查网点均有调整,分类权数也有变化,以反映居民消费结构的*新变动。经测算,本次基期轮换对CPI各月同比指数的影响平均约为0.03个百分点。 

2021年6月份工业生产者出厂价格同比上涨8.8% 环比上涨0.3%

  2021年6月份,全国工业生产者出厂价格同比上涨8.8%,环比上涨0.3%;工业生产者购进价格同比上涨13.1%,环比上涨0.8%。上半年,工业生产者出厂价格比去年同期上涨5.1%,工业生产者购进价格上涨7.1%。     一、工业生产者价格同比变动情况   工业生产者出厂价格中,生产资料价格上涨11.8%,影响工业生产者出厂价格总水平上涨约8.76个百分点。其中,采掘工业价格上涨35.1%,原材料工业价格上涨18.0%,加工工业价格上涨7.4%。生活资料价格上涨0.3%,影响工业生产者出厂价格总水平上涨约0.07个百分点。其中,食品价格上涨1.4%,衣着价格下降0.8%,一般日用品价格上涨0.3%,耐用消费品价格下降0.6%。     工业生产者购进价格中,黑色金属材料类价格上涨27.7%,有色金属材料及电线类价格上涨26.8%,燃料动力类价格上涨22.8%,化工原料类价格上涨17.1%。   二、工业生产者价格环比变动情况   工业生产者出厂价格中,生产资料价格上涨0.5%,影响工业生产者出厂价格总水平上涨约0.36个百分点。其中,采掘工业价格上涨3.9%,原材料工业价格上涨0.2%,加工工业价格上涨0.3%。生活资料价格下降0.2%,影响工业生产者出厂价格总水平下降约0.05个百分点。其中,食品价格下降0.5%,衣着价格下降0.3%,一般日用品价格下降0.2%,耐用消费品价格上涨0.1%。   工业生产者购进价格中,燃料动力类价格上涨2.9%,黑色金属材料类价格上涨1.7%,有色金属材料及电线类价格上涨0.7%,化工原料类价格上涨0.1%。 2021年6月工业生产者价格主要数据    环比涨跌幅 (%) 同比涨跌幅 (%) 上半年 同比涨跌幅(%) 一、工业生产者出厂价格 0.3 8.8 5.1   生产资料 0.5 11.8 6.8    采掘 3.9 35.1 18.5    原材料 0.2 18.0 10.4    加工 0.3 7.4 4.3   生活资料 -0.2 0.3 0.1    食品 -0.5 1.4 1.8    衣着 -0.3 -0.8 -0.9    一般日用品 -0.2 0.3 0.2    耐用消费品 0.1 -0.6 -1.2 二、工业生产者购进价格 0.8 13.1 7.1   燃料、动力类 2.9 22.8 8.1   黑色金属材料类 1.7 27.7 18.4   有色金属材料及电线类 0.7 26.8 18.9   化工原料类 0.1 17.1 8.7   木材及纸浆类 0.3 7.4 3.7   建筑材料及非金属类 0.8 5.7 0.9   其它工业原材料及半成品类 0.5 3.8 1.8   农副产品类 -1.3 5.4 5.8   纺织原料类 0.2 4.8 2.0 三、主要行业出厂价格       煤炭开采和洗选业 5.2 37.4 17.5 石油和天然气开采业 2.5 53.6 26.7 黑色金属矿采选业 5.4 52.6 37.6 有色金属矿采选业 1.6 16.8 13.6 非金属矿采选业 0.5 2.5 1.4 农副食品加工业 -0.9 4.3 5.0 食品制造业 0.0 1.5 1.0 酒、饮料及精制茶制造业 0.1 1.9 1.9 烟草制品业 0.2 0.7 0.5 纺织业 0.0 3.8 0.7 纺织服装、服饰业 -0.2 -0.4 -0.6 木材加工和木、竹、藤、棕、草制品业 -0.1 0.8 0.3 造纸和纸制品业 0.0 7.8 3.8 印刷和记录媒介复制业 -0.3 0.9 -0.2 石油、煤炭及其他燃料加工业 3.1 36.1 14.8 化学原料和化学制品制造业 0.2 20.3 12.4 医药制造业 -0.3 -0.9 -0.4 化学纤维制造业 -0.8 17.0 9.6 橡胶和塑料制品业 0.2 3.4 1.5 非金属矿物制品业 0.0 2.5 -0.2 黑色金属冶炼和压延加工业 -0.7 34.4 24.5 有色金属冶炼和压延加工业 -0.1 27.8 21.0 金属制品业 0.8 7.5 4.4 通用设备制造业 0.4 1.4 0.5 汽车制造业 0.2 -0.4 -0.7 铁路、船舶、航空航天和其他运输设备制造业 0.0 0.4 0.2 计算机、通信和其他电子设备制造业 0.2 -0.6 -1.4 电力、热力生产和供应业 -0.4 -0.7 -0.9 燃气生产和供应业 -0.1 3.5 1.8 水的生产和供应业 0.3 1.4 0.8    附注   1.指标解释   工业生产者价格指数包括工业生产者出厂价格指数(Producer Price Index for Industrial Products, 简称PPI)和工业生产者购进价格指数。   工业生产者出厂价格指数反映工业企业产品*次出售时的出厂价格的变化趋势和变动幅度。   工业生产者购进价格指数反映工业企业作为中间投入产品的购进价格的变化趋势和变动幅度。   2.统计范围   工业生产者出厂价格统计调查涵盖40个工业行业大类、1300多个基本分类的工业产品价格;工业生产者购进价格统计调查涵盖9大类、800多个基本分类的工业产品价格。   3.调查方法   工业生产者价格调查采取重点调查与典型调查相结合的调查方法,涉及全国4万多家工业企业。   4.统计标准   工业行业划分标准的依据是《国民经济行业分类》(GB/T4754-2017)。   5.数据说明   由于“四舍五入”原因,有时会出现合计数据与分类数据高值或低值相同的情况。   6.基期轮换   根据统计制度规定,我国工业生产者价格统计调查每五年进行一次基期轮换,2021年1月开始编制和发布以2020年为基期的价格指数。与上轮基期相比,新基期的调查分类目录、代表规格品和调查企业均有调整,分类权数也有变化,以反映工业生产结构的*新变动。经测算,基期轮换对各月同比指数影响平均约为0.05个百分点,在统计可接受范围内。 

国家统计局城市司高级统计师董莉娟解读2021年6月份CPI和PPI数据

2021年6月份CPI和PPI同比涨幅均略有回落 ——国家统计局城市司高级统计师董莉娟解读2021年6月份CPI和PPI数据   国家统计局今天发布了2021年6月份全国CPI(居民消费价格指数)和PPI(工业生产者出厂价格指数)数据。对此,国家统计局城市司高级统计师董莉娟进行了解读。   一、CPI环比降幅扩大,同比涨幅回落   6月份,我国经济保持稳定恢复,消费市场供应总体充足,居民消费价格运行平稳。   从环比看,CPI下降0.4%,降幅比上月扩大0.2个百分点。其中,食品价格下降2.2%,降幅扩大0.5个百分点,影响CPI下降约0.40个百分点。食品中,受生猪产能持续恢复、压栏肥猪集中出栏及消费需求季节性偏弱等因素影响,猪肉价格继续下降13.6%;鲜菜、鲜果大量上市,价格分别下降2.3%和4.5%;受供给持续偏紧等因素影响,淡水鱼价格继续上涨2.4%。非食品价格由上月上涨0.2%转为持平。非食品中,因国际原油价格上涨,汽油和柴油价格分别上涨2.1%和2.3%;出游淡季叠加局部地区疫情影响,飞机票、宾馆住宿、旅游和交通工具租赁费价格均下降,降幅在1.3%—9.2%之间。   从同比看,CPI上涨1.1%,涨幅比上月回落0.2个百分点。其中,食品价格由上月上涨0.3%转为下降1.7%,影响CPI下降约0.31个百分点,主要是猪肉价格降幅扩大带动。食品中,猪肉价格下降36.5%,降幅比上月扩大12.7个百分点;淡水鱼价格上涨33.5%,涨幅回落0.2个百分点;鸡蛋和食用植物油价格分别上涨21.7%和9.5%,涨幅均有扩大。非食品价格上涨1.7%,涨幅比上月扩大0.1个百分点,影响CPI上涨约1.38个百分点。非食品中,飞机票价格上涨27.0%,涨幅回落5.3个百分点;汽油、柴油和液化石油气价格分别上涨24.3%、26.8%和11.1%,涨幅均有扩大;空调、电视机、台式计算机及住房装潢材料等工业消费品价格均有上涨,涨幅在1.8%—3.2%之间。   据测算,在6月份1.1%的同比涨幅中,去年价格变动的翘尾影响约为0.8个百分点,比上月扩大0.1个百分点;新涨价影响约为0.3个百分点,比上月回落0.3个百分点。扣除食品和能源价格的核心CPI同比上涨0.9%,涨幅与上月相同。   二、PPI环比和同比涨幅均有回落   6月份,国内大宗商品保供稳价政策效果初步显现,市场供求关系趋于改善,工业品价格涨势有所趋缓。   从环比看,PPI上涨0.3%,涨幅比上月回落1.3个百分点。其中,生产资料价格上涨0.5%,涨幅回落1.6个百分点;生活资料价格由上月上涨0.1%转为下降0.2%。受原材料保供稳价政策影响,钢材、有色金属等行业价格过快上涨势头得到初步遏制,价格由涨转降,其中黑色金属冶炼和压延加工业价格下降0.7%,有色金属冶炼和压延加工业价格下降0.1%。高温天气导致动力煤需求较旺,带动煤炭开采和洗选业价格上涨5.2%,但在增产增供措施作用下,涨幅回落5.4个百分点。国际原油价格波动上行,带动国内石油相关行业价格涨幅扩大,其中石油开采价格上涨3.0%,精炼石油产品制造价格上涨2.5%,涨幅分别扩大0.8和0.1个百分点。此外,农资价格上涨较快,其中肥料制造价格上涨4.8%,农药制造价格上涨1.2%。   从同比看,PPI上涨8.8%,涨幅比上月回落0.2个百分点。其中,生产资料价格上涨11.8%,涨幅回落0.2个百分点;生活资料价格上涨0.3%,回落0.2个百分点。主要行业中,价格涨幅回落的有石油和天然气开采业,上涨53.6%,回落45.5个百分点;黑色金属冶炼和压延加工业,上涨34.4%,回落3.7个百分点;有色金属冶炼和压延加工业,上涨27.8%,回落2.6个百分点;化学原料和化学制品制造业,上涨20.3%,回落0.6个百分点。价格涨幅扩大的有煤炭开采和洗选业,上涨37.4%,扩大7.7个百分点;石油、煤炭及其他燃料加工业,上涨36.1%,扩大1.8个百分点。   据测算,在6月份8.8%的同比涨幅中,去年价格变动的翘尾影响约为2.4个百分点,比上月回落0.6个百分点;新涨价影响约为6.4个百分点,扩大0.4个百分点。 

Rxlifecycle(三):常出现的问题

由于rx是从下到上的执行onsubscribe()方法,然后再自上到下的执行subscribe()方法,而rxlifecycle是使用takeUntil方法来停止消息,只能终止当前的上一个onsubscribe()方法的调用,所以顺便不同会出现以下问题

坑1

    Observable.just("hello world!")
            .compose(this.<String>bindUntilEvent(ActivityEvent.PAUSE))
            .flatMap(new Func1<String, Observable<Long>>() {
                @Override
                public Observable<Long> call(String s) {
                    return Observable.interval(1, TimeUnit.SECONDS);
                }
            })

            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    Log.i(TAG, "....oh,oh,no!!..........." + aLong);
                }
            });

 

activity生命周期paused的时候

Log.i(TAG, "....oh,oh,no!!..........." + aLong);

还会执行么??会会…

如果你想全部都不执行:

Observable.just("hello world!")
            .flatMap(new Func1<String, Observable<Long>>() {
                @Override
                public Observable<Long> call(String s) {
                    return Observable.interval(1, TimeUnit.SECONDS);
                }
            })
            //fuck....here
            .compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE))
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    Log.i(TAG, "....oh,oh,no!!..........." + aLong);
                }
            });

坑2

    Observable.interval(1, TimeUnit.SECONDS)
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    Log.i(TAG, "Unsubscribing subscription ......");
                }
            })
            .doOnNext(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    Log.i(TAG, "........fuck..........." + aLong);
                }
            })
            .flatMap(new Func1<Long, Observable<String>>() {
                @Override
                public Observable<String> call(Long aLong) {
                    return Observable.just(aLong + "");
                }
            })
            .compose(this.<String>bindUntilEvent(ActivityEvent.PAUSE))
            .subscribe(new Action1<String>() {
                @Override
                public void call(String num) {
                    Log.i(TAG, "..........shit..........." + num);
                }
            });

 

activity在paused的时候,

Log.i(TAG, "........fuck..........." + aLong);
Log.i(TAG, "..........shit..........." + num);

都不会执行…

而且会unsubscribe

Rxlifecycle(二):源码解析

1.结构

Rxlifecycle代码很少,也很好理解,来看核心类。

  • 接口ActivityLifecycleProvider

    RxFragmentActivity、RxAppCompatActivity、RxFragment等类所有的组件类皆实现这个借口

  • 类RxLifecycle

2.详细分析

以RxAppCompatActivity入手来分析。

  1. 初始化一个BehaviorSubject,Subject因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。Subject的详细介绍:https://mcxiaoke.gitbooks.io/rxdocs/content/Subject.html
    private final BehaviorSubject<ActivityEvent> 
        lifecycleSubject = BehaviorSubject.create();
  2. 在activity每个生命的周期Subject发射相对应的事件。
    @Override
    @CallSuper
    protected void onStart() {
        super.onStart();
        //发送start事件
        lifecycleSubject.onNext(ActivityEvent.START);
    }
    ....

分析RxLifecycle核心的方法:bindUntilActivityEvent

  1. bindUntilActivityEvent也就是调用的bindUntilEvent
    private static <T, R> Observable.Transformer<T, T> bindUntilEvent(final Observable<R> lifecycle,        
    //返回 Transformer                                  final R event) {
    return new Observable.Transformer<T, T>() {
        @Override
        public Observable<T> call(Observable<T> source) {
            return source.takeUntil(
                lifecycle.takeFirst(new Func1<R, Boolean>() {
                    @Override
                    public Boolean call(R lifecycleEvent) {
                        return lifecycleEvent == event;
                    }
                })
            );
        }
    };
    }

     

这个方法接收两个参数,lifecycle就是activity里面的BehaviorSubject对象,event就是要我们设置的要在activity哪个生命周期取消订阅的ActivityEvent对象。

返回参数是Transformer,用来结合compose使用 Transformer相当于一个过滤器,Observable call(Observable source) 接收一个Observable然后经过处理再返回一个Observable

这个方法从里到外一层一层剥开:

lifecycle.takeFirst(new Func1<R, Boolean>() {
                    @Override
                    public Boolean call(R lifecycleEvent) {
                        return lifecycleEvent == event;
                    }
                })

如果lifecycleEvent == event结果为true,lifecycle既BehaviorSubject对象发射一个数据。

lifecycleEvent是BehaviorSubject发射的数据,既ActivityEvent对象,比如在onStart时候lifecycleSubject.onNext(ActivityEvent.START)发送的ActivityEvent.START。 event是传递进来的参数。

接着上看

return source.takeUntil()

lifecycle*核心的就是这个takeUntil。

source就是要调用compose的原始的Observable,就是例子中这个Observable

  Observable.interval(1, TimeUnit.SECONDS)
      .compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE))
      ...

来看takeUntil:

Returns an Observable that emits the items emitted by the source Observable until a second Observable emits an item.

如果lifecycle.takeFirst发射了一条数据,takeUntil就回触发,source Observable就回停止发射数据,执行Unsubscribe,流自动结束。

分析RxLifecycle核心的方法:bindUntilActivityEvent

上代码

private static <T, R> Observable.Transformer<T, T> bind(Observable<R> lifecycle,
                                                        final Func1<R, R> correspondingEvents) {
        ...
    // Make sure we're truly comparing a single stream to itself
    final Observable<R> sharedLifecycle = lifecycle.share();

    // Keep emitting from source until the corresponding event occurs in the lifecycle
    return new Observable.Transformer<T, T>() {
        @Override
        public Observable<T> call(Observable<T> source) {
            return source.takeUntil(
                Observable.combineLatest(
                    sharedLifecycle.take(1).map(correspondingEvents),
                    sharedLifecycle.skip(1),
                    new Func2<R, R, Boolean>() {
                        @Override
                        public Boolean call(R bindUntilEvent, R lifecycleEvent) {
                            return lifecycleEvent == bindUntilEvent;
                        }
                    })
                    .onErrorReturn(RESUME_FUNCTION)
                    .takeFirst(SHOULD_COMPLETE)
            );
        }
    };
}
  1. .share()操作符:
  2. 来看Observable.combineLatest,这个操作符接收三个参数。

    *个参数:取BehaviorSubject发射的数据中的*个,然后转换成对应的生命周期。例如在onStart()中调用了bindToLifecycle,take(1)后的数据是ActivityEvent.START,经过map(),返回ActivityEvent.STOP。

    第二个参数:从BehaviorSubject发射的数据中经过.skip(1)操作符,过滤掉*个数据。例如在onStart()中调用了bindToLifecycle,在后续的生命周期中会收到,ActivityEvent.RESUME、ActivityEvent.PAUSE、ActivityEvent.STOP、ActivityEvent.DESTROY

    第三个参数:作用是combineFunction,把前两个参数*近发射的数据按照规则进行合并。规则是比对两次事件是否相等,然后合并后数据返回Boolean结果。比如params2发射ActivityEvent.RESUME的时候,和params1发射的ActivityEvent.STOP进行比对,返回false结果;params2发射ActivityEvent.STOP的时候,和params1发射的ActivityEvent.STOP进行比对,返回true结果。

  3. onErrorReturn()
    private static final Func1<Throwable, Boolean> RESUME_FUNCTION = new Func1<Throwable, Boolean>() {
    @Override
    public Boolean call(Throwable throwable) {
        if (throwable instanceof OutsideLifecycleException) {
            return true;
        }
    
        Exceptions.propagate(throwable);
        return false;
    }
    };

    如果发生错误,判断是否是自定义错误类型 OutsideLifecycleException,如果是,则返回true,否则其他错误类型返回false。

  4. .takeFirst(SHOULD_COMPLETE)
    private static final Func1<Boolean, Boolean> SHOULD_COMPLETE = new Func1<Boolean, Boolean>() {
        @Override
        public Boolean call(Boolean shouldComplete) {
            return shouldComplete;
        }
    };

    返回*个结果是true的数据。如果combineLatest链中返回false,则不发射任何数据。

  5. source.takeUntil

    如果combineLatest.onErrorReturn.takeFirst链返回true,则takeUntil操作符终止订阅,source Observable就回停止发射数据,执行Unsubscribe,流自动结束。

OVER!

Rxlifecycle(一):使用

Rxlifecycle使用非常方便简单,如下:

1.集成

build.gradle添加

   //Rxlifecycle
   compile 'com.trello:rxlifecycle:0.3.1'
   compile 'com.trello:rxlifecycle-components:0.3.1'

   //Rxjava
   compile 'io.reactivex:rxjava:1.0.16'

 

Components包中包含RxActivity、RxFragment等等,可以用Rxlifecycle提供的,也可以自定义。

2.Sample解析

官方sample源码: 两种使用方法:

1.手动设置取消订阅的时机,例子1、例子3

2.绑定生命周期,自动取消订阅,例子2

public class MainActivity extends RxAppCompatActivity {

//Note:Activity需要继承RxAppCompatActivity,fragment需要继承RxFragment,等等
//可以使用的组件在components包下面

private static final String TAG = "RxLifecycle";

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    Log.d(TAG, "onCreate()");
    setContentView(R.layout.activity_main);

    // Specifically bind this until onPause()

    //Note:例子1:
    Observable.interval(1, TimeUnit.SECONDS)
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    Log.i(TAG, "Unsubscribing subscription from onCreate()");
                }
            })
            //Note:手动设置在activity onPause的时候取消订阅
            .compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE))
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long num) {
                    Log.i(TAG, "Started in onCreate(), running until onPause(): " + num);
                }
            });
}

@Override
protected void onStart() {
    super.onStart();
    Log.d(TAG, "onStart()");

    //Note:例子2:
    // Using automatic unsubscription, this should determine that the correct time to
    // unsubscribe is onStop (the opposite of onStart).
    Observable.interval(1, TimeUnit.SECONDS)
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    Log.i(TAG, "Unsubscribing subscription from onStart()");
                }
            })
            //Note:bindToLifecycle的自动取消订阅示例,因为是在onStart的时候调用,所以在onStop的时候自动取消订阅
            .compose(this.<Long>bindToLifecycle())
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long num) {
                    Log.i(TAG, "Started in onStart(), running until in onStop(): " + num);
                }
            });
}

@Override
protected void onResume() {
    super.onResume();
    Log.d(TAG, "onResume()");

    //Note:例子3:
    // `this.<Long>` is necessary if you're compiling on JDK7 or below.
    // If you're using JDK8+, then you can safely remove it.
    Observable.interval(1, TimeUnit.SECONDS)
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    Log.i(TAG, "Unsubscribing subscription from onResume()");
                }
            })
            //Note:手动设置在activity onDestroy的时候取消订阅
            .compose(this.<Long>bindUntilEvent(ActivityEvent.DESTROY))
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long num) {
                    Log.i(TAG, "Started in onResume(), running until in onDestroy(): " + num);
                }
            });
}
...

 

RxJava操作符之Share, Publish, Refcount

看源码知道.share()操作符是.publish().refcount()调用链的包装。

先来看ConnectedObservable

“ConnectedObservable” – This is a kind of observable which doesn’t emit items even if subscribed to.
It only starts emitting items after its .connect() method is called.

因为这个原因,在ConnectedObservable的connect这个方法被调用之前,connected obesrvable也被认为是“冷”和不活跃。

再看publish方法

.publish()– This method allows us to change an ordinary observable into a “ConnectedObservable”.
Simply call this method on an ordinary observable and it becomes a connected one.

现在我们知道了share操作符的1/2,那么为什么需要运用Connected Observable这个操作符呢?文档上是这么写的:

In this way you can wait for all intended Subscribers to subscribe to the Observable before the Observable begins emitting items.

这就意味着publish可以调用多个subscriber。当你有超过一个订阅者的时候,处理每个订阅和正确的销毁他们变得棘手。 为了使这个更方便,Rx发明了这个魔法操作符refcount():

refcount() – This operator keeps track of how many subscribers are subscribed to the resulting Observable and
refrains from disconnecting from the source ConnectedObservable until all such Observables are unsubscribed.

refcount本质上在后台维护着一个引用计数器,当一个subscription需要取消订阅或者销毁的时候,发出一个正确的动作。

我们再次看一下debouncedBuffer的例子,看一下在哪,share是怎么用的。

Observable<Object> tapEventEmitter = _rxBus.toObserverable().share();
// which is really the same as:
Observable<Object> tapEventEmitter = _rxBus.toObserverable().publish().refcount();

我们现在有了一个”shareable”的名字叫”tapEventEmitter”的observable。 因为他是可以分享的,而且还不是“live”(share操作符中的publish调用使其变成一个ConnectedObservable), 我们可以用他构成我们的Observables,而且要确保我们有了一个原始的observable的引用 (这个例子中原始的observable是_rxBus.toObserverable()).

Observable<Object> tapEventEmitter = _rxBus.toObserverable().share();
Observable<Object> debouncedEventEmitter = tapEventEmitter.debounce(1, TimeUnit.SECONDS);
tapEventEmitter.buffer(debouncedEventEmitter)
//...

所有的这一切看起来都很好。然而这个实现会有一个可能的竞争条件。因为这有两个subscribers(debounce and buffer)而且会在不同的时间点发生,所以竞争条件就会发生。 记住RxBus是由hot/live Subject支持的不断的发射数据。通过使用share操作符,我们保证引用的是同一个资源。 而不是subscribers在不同的时间点订阅,他们会收到准确的相同的数据。

The race condition is when the two consumers subscribe. Often on a hot stream it doesn’t matter when subscribers come and go,and refCount is perfect for that.
The race condition refCount protects against is having only 1 active subscription upstream. However,if 2 Subscribers subscribe to a refcounted stream that emits 1, 2, 3, 4, 5, the first may get 1, 2, 3, 4, 5 and the second may get 2, 3, 4, 5.

To ensure all subscribers start at exactly the same time and get the exact same values, refCount can not be used.
Either ConnectableObservable with a manual, imperative invocation of connect needs to be done, or the variant of publish(function)which connects everything within the function before connecting the upstream.

在我们的用法中几乎立即执行所以没有什么关系。但是我们原始的意图是把debouncedBuffer方法作为一个单独的操作符。 如果相同的事件没有被发射出去,从概念上看起来是不正确的。

通过Bean后来的建议,我添加了一个更好的第三方的实现,用来处理这种竞争条件。

// don't start emitting items just yet by turning the observable to a connected one
ConnectableObservable<Object> tapEventEmitter = _rxBus.toObserverable().publish();

tapEventEmitter.publish((Func1) (stream) -> {

// inside `publish`, "stream" is truly multicasted

// applying the same technique for getting a debounced buffer sequence
    return stream.buffer(stream.debounce(1, TimeUnit.SECONDS));

}).subscribe((Action1) (taps) {
_showTapCount(taps.size());
});

// start listening to events now
tapEventEmitter.connect();

RxJava2 源码解析(二)

概述

承接上一篇RxJava2 源码解析(一),
本系列我们的目的:

知道源头(Observable)是如何将数据发送出去的。
知道终点(Observer)是如何接收到数据的。
何时将源头和终点关联起来的
知道线程调度是怎么实现的
知道操作符是怎么实现的

本篇计划讲解一下4,5.

RxJava*强大的莫过于它的线程调度 和 花式操作符。
map操作符

map是一个高频的操作符,我们首先拿他开刀。
例子如下,源头Observable发送的是String类型的数字,利用map转换成int型,*终在终点Observer接受到的也是int类型数据。:

final Observable<String> testCreateObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext(“1”);
e.onComplete()
}
});

Observable<Integer> map = testCreateObservable.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
});
map.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, “onSubscribe() called with: d = [” + d + “]”);
}

@Override
public void onNext(Integer value) {
Log.d(TAG, “onNext() called with: value = [” + value + “]”);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, “onError() called with: e = [” + e + “]”);
}

@Override
public void onComplete() {
Log.d(TAG, “onComplete() called”);
}
});

我们看一下map函数的源码:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//判空略过
ObjectHelper.requireNonNull(mapper, “mapper is null”);
//RxJavaPlugins.onAssembly()是hook 上文提到过
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

RxJavaPlugins.onAssembly()是hook 上文提到过,所以我们只要看ObservableMap,它就是返回到我们手里的Observable:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
//将function变换函数类保存起来
final Function<? super T, ? extends U> function;

public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
//super()将上游的Observable保存起来 ,用于subscribeActual()中用。
super(source);
this.function = function;
}

@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}

它继承自AbstractObservableWithUpstream,该类继承自Observable,很简单,就是将上游的ObservableSource保存起来,做一次wrapper,所以它也算是装饰者模式的提现,如下:

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
//将上游的`ObservableSource`保存起来
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}

关于ObservableSource,代表了一个标准的无背压的 源数据接口,可以被Observer消费(订阅),如下:

public interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}

所有的Observable都已经实现了它,所以我们可以认为Observable和ObservableSource在本文中是相等的:

public abstract class Observable<T> implements ObservableSource<T> {

所以我们得到的ObservableMap对象也很简单,就是将上游的Observable和变换函数类Function保存起来。
Function的定义超级简单,就是一个接口,给我一个T,还你一个R.

public interface Function<T, R> {
R apply(T t) throws Exception;
}

本例写的是将String->int.

重头戏,subscribeActual()是订阅真正发生的地方,ObservableMap如下编写,就一句话,用MapObserver订阅上游Observable。:

@Override
public void subscribeActual(Observer<? super U> t) {
//用MapObserver订阅上游Observable。
source.subscribe(new MapObserver<T, U>(t, function));
}

MapObserver也是装饰者模式,对终点(下游)Observer修饰。

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
//super()将actual保存起来
super(actual);
//保存Function变量
this.mapper = mapper;
}
@Override
public void onNext(T t) {
//done在onError 和 onComplete以后才会是true,默认这里是false,所以跳过
if (done) {
return;
}
//默认sourceMode是0,所以跳过
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
//下游Observer接受的值
U v;
//这一步执行变换,将上游传过来的T,利用Function转换成下游需要的U。
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), “The mapper function returned a null value.”);
} catch (Throwable ex) {
fail(ex);
return;
}
//变换后传递给下游Observer
actual.onNext(v);
}

到此我们梳理一下流程:
订阅的过程,是从下游到上游依次订阅的。

即终点 Observer 订阅了 map 返回的ObservableMap。
然后map的Observable(ObservableMap)在被订阅时,会订阅其内部保存上游Observable,用于订阅上游的Observer是一个装饰者(MapObserver),内部保存了下游(本例是终点)Observer,以便上游发送数据过来时,能传递给下游。
以此类推,直到源头Observable被订阅,根据上节课内容,它开始向Observer发送数据。

数据传递的过程,当然是从上游push到下游的,

源头Observable传递数据给下游Observer(本例就是MapObserver)
然后MapObserver接收到数据,对其变换操作后(实际的function在这一步执行),再调用内部保存的下游Observer的onNext()发送数据给下游
以此类推,直到终点Observer。

线程调度subscribeOn

简化问题,代码如下:

Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d(TAG, “subscribe() called with: e = [” + e + “]” + Thread.currentThread());
e.onNext(“1”);
e.onComplete();
}
//只是在Observable和Observer之间增加了一句线程调度代码
}).subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, “onSubscribe() called with: d = [” + d + “]”);
}
@Override
public void onNext(String value) {
Log.d(TAG, “onNext() called with: value = [” + value + “]”);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, “onError() called with: e = [” + e + “]”);
}
@Override
public void onComplete() {
Log.d(TAG, “onComplete() called”);
}
});

只是在Observable和Observer之间增加了一句线程调度代码:.subscribeOn(Schedulers.io()).
查看subscribeOn()源码:

public final Observable<T> subscribeOn(Scheduler scheduler) {
//判空略过
ObjectHelper.requireNonNull(scheduler, “scheduler is null”);
//抛开Hook,重点还是ObservableSubscribeOn
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

等等,怎么有种似曾相识的感觉,大家可以把文章向上翻,看看map()的源码。
和subscribeOn()的套路如出一辙,那么我们根据上面的结论,
先猜测ObservableSubscribeOn类也是一个包装类(装饰者),点进去查看:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
//保存线程调度器
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
//map的源码中我们分析过,super()只是简单的保存ObservableSource
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
//1  创建一个包装Observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//2  手动调用 下游(终点)Observer.onSubscribe()方法,所以onSubscribe()方法执行在 订阅处所在的线程
s.onSubscribe(parent);
//3 setDisposable()是为了将子线程的操作加入Disposable管理中
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
//4 此时已经运行在相应的Scheduler 的线程中
source.subscribe(parent);
}
}));
}

和map套路大体一致,ObservableSubscribeOn自身同样是个包装类,同样继承AbstractObservableWithUpstream。
创建了一个SubscribeOnObserver类,该类按照套路,应该也是实现了Observer、Disposable接口的包装类,让我们看一下:

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
//真正的下游(终点)观察者
final Observer<? super T> actual;
//用于保存上游的Disposable,以便在自身dispose时,连同上游一起dispose
final AtomicReference<Disposable> s;

SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}

@Override
public void onSubscribe(Disposable s) {
//onSubscribe()方法由上游调用,传入Disposable。在本类中赋值给this.s,加入管理。
DisposableHelper.setOnce(this.s, s);
}

//直接调用下游观察者的对应方法
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}

//取消订阅时,连同上游Disposable一起取消
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
//这个方法在subscribeActual()中被手动调用,为了将Schedulers返回的Worker加入管理
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}

这两个类根据上一节的铺垫加上注释,其他都好理解,稍微不好理解的应该是下面两句代码:

//ObservableSubscribeOn类
//3 setDisposable()是为了将子线程的操作加入Disposable管理中
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
//4 此时已经运行在相应的Scheduler 的线程中
source.subscribe(parent);
}
}));

//SubscribeOnObserver类
//这个方法在subscribeActual()中被手动调用,为了将Schedulers返回的Worker加入管理
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}

其中scheduler.scheduleDirect(new Runnable()..)方法源码如下:

/**
* Schedules the given task on this scheduler non-delayed execution.
* …..
*/
public Disposable scheduleDirect(Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

从注释和方法名我们可以看出,这个传入的Runnable会立刻执行。
再继续往里面看:

public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
//class Worker implements Disposable ,Worker本身是实现了Disposable
final Worker w = createWorker();
//hook略过
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//开始在Worker的线程执行任务,
w.schedule(new Runnable() {
@Override
public void run() {
try {
//调用的是 run()不是 start()方法执行的线程的方法。
decoratedRun.run();
} finally {
//执行完毕会 dispose()
w.dispose();
}
}
}, delay, unit);
//返回Worker对象
return w;
}

createWorker()是一个抽象方法,由具体的Scheduler类实现,例如IoScheduler对应的Schedulers.io().

public abstract Worker createWorker();

初看源码,为了了解大致流程,不宜过入深入,先点到为止。
OK,现在我们总结一下scheduler.scheduleDirect(new Runnable()..)的重点:

传入的Runnable是立刻执行的。
返回的Worker对象就是一个Disposable对象,
Runnable执行时,是直接手动调用的 run(),而不是 start()方法.
上一点应该是为了,能控制在run()结束后(包括异常终止),都会自动执行Worker.dispose().

而返回的Worker对象也会被parent.setDisposable(…)加入管理中,以便在手动dispose()时能取消线程里的工作。

我们总结一下subscribeOn(Schedulers.xxx())的过程:

返回一个ObservableSubscribeOn包装类对象
上一步返回的对象被订阅时,回调该类中的subscribeActual()方法,在其中会立刻将线程切换到对应的Schedulers.xxx()线程。
在切换后的线程中,执行source.subscribe(parent);,对上游(终点)Observable订阅
上游(终点)Observable开始发送数据,根据RxJava2 源码解析(一),上游发送数据仅仅是调用下游观察者对应的onXXX()方法而已,所以此时操作是在切换后的线程中进行。

一点扩展,
大家可能看过一个结论:
subscribeOn(Schedulers.xxx())切换线程N次,总是以*次为准,或者说离源Observable*近的那次为准,并且对其上面的代码生效(这一点对比的ObserveOn())。

为什么?
– 因为根据RxJava2 源码解析(一)中提到,订阅流程从下游往上游传递
– 在subscribeActual()里开启了Scheduler的工作,source.subscribe(parent);,从这一句开始切换了线程,所以在这之上的代码都是在切换后的线程里的了。
– 但如果连续切换,*上面的切换*晚执行,此时线程变成了*上面的subscribeOn(xxxx)指定的线程,
– 而数据push时,是从上游到下游的,所以会在离源头*近的那次subscribeOn(xxxx)的线程里push数据(onXXX())给下游。

可写如下代码验证:

Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d(TAG, “subscribe() called with: e = [” + e + “]” + Thread.currentThread());
e.onNext(“1”);
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
//依然是io线程
Log.d(TAG, “apply() called with: s = [” + s + “]” + Thread.currentThread());
return s;
}
})
.subscribeOn(Schedulers.computation())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, “onSubscribe() called with: d = [” + d + “]”);
}
@Override
public void onNext(String value) {
Log.d(TAG, “onNext() called with: value = [” + value + “]”);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, “onError() called with: e = [” + e + “]”);
}
@Override
public void onComplete() {
Log.d(TAG, “onComplete() called”);
}
});

线程调度observeOn

在上一节的基础上,增加一个observeOn(AndroidSchedulers.mainThread()),就完成了观察者线程的切换。

.subscribeOn(Schedulers.computation())
//在上一节的基础上,增加一个ObserveOn
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {

继续看源码吧,我已经能猜出来了,hook+new XXXObservable();

public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
….
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

果然,查看ObservableObserveOn,:
高能预警,这部分的代码 有些略多,建议读者打开源码边看边读。

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
//本例是 AndroidSchedulers.mainThread()
final Scheduler scheduler;
//默认false
final boolean delayError;
//默认128
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
// false
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//1 创建出一个 主线程的Worker
Scheduler.Worker w = scheduler.createWorker();
//2 订阅上游数据源,
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}

本例中,就是两步:

创建一个AndroidSchedulers.mainThread()对应的Worker
用ObserveOnObserver订阅上游数据源。这样当数据从上游push下来,会由ObserveOnObserver对应的onXXX()处理。

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
//下游的观察者
final Observer<? super T> actual;
//对应Scheduler里的Worker
final Scheduler.Worker worker;
//上游被观察者 push 过来的数据都存在这里
SimpleQueue<T> queue;
Disposable s;
//如果onError了,保存对应的异常
Throwable error;
//是否完成
volatile boolean done;
//是否取消
volatile boolean cancelled;
// 代表同步发送 异步发送
int sourceMode;
….
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
//省略大量无关代码
//创建一个queue 用于保存上游 onNext() push的数据
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//回调下游观察者onSubscribe方法
actual.onSubscribe(this);
}
}

@Override
public void onNext(T t) {
//1 执行过error / complete 会是true
if (done) {
return;
}
//2 如果数据源类型不是异步的, 默认不是
if (sourceMode != QueueDisposable.ASYNC) {
//3 将上游push过来的数据 加入 queue里
queue.offer(t);
}
//4 开始进入对应Workder线程,在线程里 将queue里的t 取出 发送给下游Observer
schedule();
}

@Override
public void onError(Throwable t) {
//已经done 会 抛异常 和 上一篇文章里提到的一样
if (done) {
RxJavaPlugins.onError(t);
return;
}
//给error存个值
error = t;
done = true;
//开始调度
schedule();
}

@Override
public void onComplete() {
//已经done 会 返回  不会crash 和上一篇文章里提到的一样
if (done) {
return;
}
done = true;
//开始调度
schedule();
}

void schedule() {
if (getAndIncrement() == 0) {
//该方法需要传入一个线程, 注意看本类实现了Runnable的接口,所以查看对应的run()方法
worker.schedule(this);
}
}
//从这里开始,这个方法已经是在Workder对应的线程里执行的了
@Override
public void run() {
//默认是false
if (outputFused) {
drainFused();
} else {
//取出queue里的数据 发送
drainNormal();
}
}

void drainNormal() {
int missed = 1;

final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;

for (;;) {
// 1 如果已经 终止 或者queue空,则跳出函数,
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}

for (;;) {
boolean d = done;
T v;

try {
//2 从queue里取出一个值
v = q.poll();
} catch (Throwable ex) {
//3 异常处理 并跳出函数
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
return;
}
boolean empty = v == null;
//4 再次检查 是否 终止  如果满足条件 跳出函数
if (checkTerminated(d, empty, a)) {
return;
}
//5 上游还没结束数据发送,但是这边处理的队列已经是空的,不会push给下游 Observer
if (empty) {
//仅仅是结束这次循环,不发送这个数据而已,并不会跳出函数
break;
}
//6 发送给下游了
a.onNext(v);
}

//7 对不起这里我也不是很明白,大致猜测是用于 同步原子操作 如有人知道 烦请告知
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}

//检查 是否 已经 结束(error complete), 是否没数据要发送了(empty 空),
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
//如果已经disposed
if (cancelled) {
queue.clear();
return true;
}
// 如果已经结束
if (d) {
Throwable e = error;
//如果是延迟发送错误
if (delayError) {
//如果空
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
//停止worker(线程)
worker.dispose();
return true;
}
} else {
//发送错误
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
//发送complete
if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
}

核心处都加了注释,总结起来就是,

ObserveOnObserver实现了Observer和Runnable接口。
在onNext()里,先不切换线程,将数据加入队列queue。然后开始切换线程,在另一线程中,从queue里取出数据,push给下游Observer
onError() onComplete()除了和RxJava2 源码解析(一)提到的一样特性之外,也是将错误/完成信息先保存,切换线程后再发送。
所以observeOn()影响的是其下游的代码,且多次调用仍然生效。
因为其切换线程代码是在Observer里onXXX()做的,这是一个主动的push行为(影响下游)。
关于多次调用生效问题。对比subscribeOn()切换线程是在subscribeActual()里做的,只是主动切换了上游的订阅线程,从而影响其发射数据时所在的线程。而直到真正发射数据之前,任何改变线程的行为,都会生效(影响发射数据的线程)。所以subscribeOn()只生效一次。observeOn()是一个主动的行为,并且切换线程后会立刻发送数据,所以会生效多次.

转载请标明出处:
http://blog.csdn.net/zxt0601/article/details/61637439
本文出自:【张旭童的博客】(http://blog.csdn.net/zxt0601)

总结

本文带大家走读分析了三个东西:

map操作符原理:

内部对上游Observable进行订阅
内部订阅者接收到数据后,将数据转换,发送给下游Observer.
操作符返回的Observable和其内部订阅者、是装饰者模式的体现。
操作符数据变换的操作,也是发生在订阅后。

线程调度subscribeOn():

内部先切换线程,在切换后的线程中对上游Observable进行订阅,这样上游发送数据时就是处于被切换后的线程里了。
也因此多次切换线程,*后一次切换(离源数据*近)的生效。
内部订阅者接收到数据后,直接发送给下游Observer.
引入内部订阅者是为了控制线程(dispose)
线程切换发生在Observable中。

线程调度observeOn():

使用装饰的Observer对上游Observable进行订阅
在Observer中onXXX()方法里,将待发送数据存入队列,同时请求切换线程处理真正push数据给下游。
多次切换线程,都会对下游生效。

源码里那些实现了Runnable的类或者匿名内部类,*终并没有像往常那样被丢给Thread类执行。
而是先切换线程,再直接执行Runnable的run()方法。
这也加深了我对面向对象,对抽象、Runnable的理解,它就是一个简简单单的接口,里面就一个简简单单的run(),
我认为,之所以有Runnable,只是抽象出 一个可运行的任务的概念。
也许这句话很平淡,书上也会提到,各位大佬早就知道,但是如今我顺着RxJava2的源码这么走读了一遍,确真真切切的感受到了这些设计思想的美妙。
———————

RxJava2 源码解析(一)

概述

*近决定找工作,准备面试中,打算看一看RxJava2的源码,遂有了这篇文章。

不会对RxJava2的源码逐字逐句的阅读,只寻找关键处,我们平时接触得到的那些代码。
背压实际中接触较少,故只分析了Observable.
分析的源码版本为:2.0.1

我们的目的:

知道源头(Observable)是如何将数据发送出去的。
知道终点(Observer)是如何接收到数据的。
何时将源头和终点关联起来的
知道线程调度是怎么实现的
知道操作符是怎么实现的

本文先达到目的1 ,2 ,3。
我个人认为主要还是适配器模式的体现,我们接触的就只有Observable和Observer,其实内部有大量的中间对象在适配:将它们两联系起来,加入一些额外功能,例如考虑dispose和hook等。
从create开始。

这是一段不涉及操作符和线程切换的简单例子:

Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext(“1”);
e.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, “onSubscribe() called with: d = [” + d + “]”);
}

@Override
public void onNext(String value) {
Log.d(TAG, “onNext() called with: value = [” + value + “]”);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, “onError() called with: e = [” + e + “]”);
}

@Override
public void onComplete() {
Log.d(TAG, “onComplete() called”);
}
});

拿 create来说,

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//…..
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

返回值是Observable,参数是ObservableOnSubscribe,定义如下:

public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> e) throws Exception;
}

ObservableOnSubscribe是一个接口,里面就一个方法,也是我们实现的那个方法:
该方法的参数是 ObservableEmitter,我认为它是关联起 Disposable概念的一层:

public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(Disposable d);
void setCancellable(Cancellable c);
boolean isDisposed();
ObservableEmitter<T> serialize();
}

ObservableEmitter也是一个接口。里面方法很多,它也继承了 Emitter<T> 接口。

public interface Emitter<T> {
void onNext(T value);
void onError(Throwable error);
void onComplete();
}

Emitter<T>定义了 我们在ObservableOnSubscribe中实现subscribe()方法里*常用的三个方法。

好,我们回到原点,create()方法里就一句话return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));,其中提到RxJavaPlugins.onAssembly():

/**
* Calls the associated hook function.
* @param <T> the value type
* @param source the hook’s input value
* @return the value returned by the hook
*/
@SuppressWarnings({ “rawtypes”, “unchecked” })
public static <T> Observable<T> onAssembly(Observable<T> source) {
Function<Observable, Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}

可以看到这是一个关于hook的方法,关于hook我们暂且不表,不影响主流程,我们默认使用中都没有hook,所以这里就是直接返回source,即传入的对象,也就是new ObservableCreate<T>(source).

ObservableCreate我认为算是一种适配器的体现,create()需要返回的是Observable,而我现在有的是(方法传入的是)ObservableOnSubscribe对象,ObservableCreate将ObservableOnSubscribe适配成Observable。
其中subscribeActual()方法表示的是被订阅时真正被执行的方法,放后面解析:

public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

OK,至此,创建流程结束,我们得到了Observable<T>对象,其实就是ObservableCreate<T>.
到订阅subscribe 结束

subscribe():

public final void subscribe(Observer<? super T> observer) {

try {
//1 hook相关,略过
observer = RxJavaPlugins.onSubscribe(this, observer);

//2 真正的订阅处
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
//3 错误处理,
Exceptions.throwIfFatal(e);
// can’t call onError because no way to know if a Disposable has been set or not
// can’t call onSubscribe because the call might have set a Subscription already
//4 hook错误相关,略过
RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException(“Actually not, but can’t throw other exceptions due to RS”);
npe.initCause(e);
throw npe;
}
}

关于hook的代码:
可以看到如果没有hook,即相应的对象是null,则是传入什么返回什么的。

/**
* Calls the associated hook function.
* @param <T> the value type
* @param source the hook’s input value
* @param observer the observer
* @return the value returned by the hook
*/
@SuppressWarnings({ “rawtypes”, “unchecked” })
public static <T> Observer<? super T> onSubscribe(Observable<T> source, Observer<? super T> observer) {
//1 默认onObservableSubscribe(可理解为一个flatmap的操作)是null
BiFunction<Observable, Observer, Observer> f = onObservableSubscribe;
//2 所以这句跳过,不会对其进行apply
if (f != null) {
return apply(f, source, observer);
}
//3 返回参数2
return observer;
}

我也是验证了一下 三个Hook相关的变量,确实是null:

Consumer<Throwable> errorHandler = RxJavaPlugins.getErrorHandler();
BiFunction<Observable, Observer, Observer> onObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe();
Function<Observable, Observable> onObservableAssembly = RxJavaPlugins.getOnObservableAssembly();

Log.e(TAG, “errorHandler = [” + errorHandler + “]”);
Log.e(TAG, “onObservableSubscribe = [” + onObservableSubscribe + “]”);
Log.e(TAG, “onObservableAssembly = [” + onObservableAssembly + “]”);

所以订阅时的重点就是:

//2 真正的订阅处
subscribeActual(observer);

我们将*节提到的ObservableCreate里的subscribeActual()方法拿出来看看:

@Override
protected void subscribeActual(Observer<? super T> observer) {
//1 创建CreateEmitter,也是一个适配器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//2 onSubscribe()参数是Disposable ,所以CreateEmitter可以将Observer->Disposable 。还有一点要注意的是`onSubscribe()`是在我们执行`subscribe()`这句代码的那个线程回调的,并不受线程调度影响。
observer.onSubscribe(parent);
try {
//3 将ObservableOnSubscribe(源头)与CreateEmitter(Observer,终点)联系起来
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
//4 错误回调
parent.onError(ex);
}
}

Observer是一个接口,里面就四个方法,我们在开头的例子中已经全部实现(打印Log)。

public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T value);
void onError(Throwable e);
void onComplete();
}

重点在这一句:

//3 将ObservableOnSubscribe(源头)与CreateEmitter(Observer,终点)联系起来
source.subscribe(parent);

source即ObservableOnSubscribe对象,在本文中是:

new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext(“1”);
e.onComplete();
}
}

则会调用parent.onNext() 和parent.onComplete(),parent是CreateEmitter对象,如下:

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}

@Override
public void onNext(T t) {

//如果没有被dispose,会调用Observer的onNext()方法
if (!isDisposed()) {
observer.onNext(t);
}
}

@Override
public void onError(Throwable t) {

//1 如果没有被dispose,会调用Observer的onError()方法
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
//2 一定会自动dispose()
dispose();
}
} else {
//3 如果已经被dispose了,会抛出异常。所以onError、onComplete彼此互斥,只能被调用一次
RxJavaPlugins.onError(t);
}
}

@Override
public void onComplete() {
//1 如果没有被dispose,会调用Observer的onComplete()方法
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
//2 一定会自动dispose()
dispose();
}
}
}

@Override
public void dispose() {
DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}

总结重点:

Observable和Observer的关系没有被dispose,才会回调Observer的onXXXX()方法
Observer的onComplete()和onError() 互斥只能执行一次,因为CreateEmitter在回调他们两中任意一个后,都会自动dispose()。根据*点,验证此结论。
Observable和Observer关联时(订阅时),Observable才会开始发送数据。
ObservableCreate将ObservableOnSubscribe(真正的源)->Observable.
ObservableOnSubscribe(真正的源)需要的是发射器ObservableEmitter.
CreateEmitter将Observer->ObservableEmitter,同时它也是Disposable.
先error后complete,complete不显示。 反之会crash,感兴趣的可以写如下代码验证。

e.onNext(“1”);
//先error后complete,complete不显示。 反之 会crash
//e.onError(new IOException(“sb error”));
e.onComplete();
e.onError(new IOException(“sb error”));

一个好玩的地方DisposableHelper

原本到这里,*简单的一个流程我们算是搞清了。
还值得一提的是,DisposableHelper.dispose(this);
DisposableHelper很有趣,它是一个枚举,这是利用枚举实现了一个单例disposed state,即是否disposed,如果Disposable类型的变量的引用等于DISPOSED,则起点和终点已经断开联系。
其中大多数方法 都是静态方法,所以isDisposed()方法的实现就很简单,直接比较引用即可.
其他的几个方法,和AtomicReference类搅基在了一起。
这是一个实现引用原子操作的类,对象引用的原子更新,常用方法如下:

//返回当前的引用。
V get()
//如果当前值与给定的expect引用相等,(注意是引用相等而不是equals()相等),更新为指定的update值。
boolean compareAndSet(V expect, V update)
//原子地设为给定值并返回旧值。
V getAndSet(V newValue)

OK,铺垫完了我们看看源码吧:

public enum DisposableHelper implements Disposable {
/**
* The singleton instance representing a terminal, disposed state, don’t leak it.
*/
DISPOSED
;

public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}

public static boolean dispose(AtomicReference<Disposable> field) {
//1 通过断点查看,默认情况下,field的值是”null”,并非引用是null哦!大坑大坑大坑
//但是current是null引用
Disposable current = field.get();
Disposable d = DISPOSED;
//2 null不等于DISPOSED
if (current != d) {
//3 field是DISPOSED了,current还是null
current = field.getAndSet(d);
if (current != d) {
//4 默认情况下 走不到这里,这里是在设置了setCancellable()后会走到。
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}

总结

在subscribeActual()方法中,源头和终点关联起来。
source.subscribe(parent);这句代码执行时,才开始从发送ObservableOnSubscribe中利用ObservableEmitter发送数据给Observer。即数据是从源头push给终点的。
CreateEmitter 中,只有Observable和Observer的关系没有被dispose,才会回调Observer的onXXXX()方法
Observer的onComplete()和onError() 互斥只能执行一次,因为CreateEmitter在回调他们两中任意一个后,都会自动dispose()。根据上一点,验证此结论。
先error后complete,complete不显示。 反之会crash
还有一点要注意的是onSubscribe()是在我们执行subscribe()这句代码的那个线程回调的,并不受线程调度影响。
———————

这可能是*好的RxJava 2.x 入门教程

前言

RxJava 对大家而言肯定不陌生,其受欢迎程度不言而喻。而在去年的早些时候,官方便宣布,将在一段时间后不再对 RxJava 1.x 进行维护,而在仓库中另辟蹊径,开始对 RxJava 2.x 进行推广起来,我原本是不想写这么一套教程的,因为 RxJava 受欢迎度这么高,而且这 2.x 也出来了这么久,我坚信网上一定有很多超级大牛早已为大家避雷。然而很难过的是,我搜索了些时间,能搜出来的基本都是对 RxJava 1.x 的讲解,或者是 Blog 标题就没说清楚是否是 2.x 系列(对于我们这种标题党来说很难受)。这不,我就来抛砖引玉了。

咱们先不提别的,先为大家带点可能你早已熟知的干货——来自扔物线大神的给Android开发者的 RxJava 详解。

该文详细地为大家讲解了 RxJava 的优势、原理以及使用方式和适用情景,一定被众多的 Android 开发者视为神器。可惜,文章历史比较久远,基本都是讲解的 RxJava 1.x了。

那关注的小伙伴一定会问,那我没用过 RxJava 1.x ,还有必要先学习 1.x 的内容吗?

个人觉得不必要,因为 RxJava 2.x 是按照 Reactive-Streams specification 规范完全的重写的,完全独立于 RxJava 1.x 而存在,它改变了以往 RxJava 的用法。

额,由于个人能力水平有限,所以对于英文基础好的,大家可以去官网查阅相关 API 介绍,而对于英文不那么流畅的童鞋,我也为大家准备了干货:RxJava2Examples (正在更新)。

与RxJava 1.x的差异

其实,我标题为入门教程,按理说应该从简单入门开始讲的,原谅我突然偏题了,因为我觉得可能大多数人都了解或者使用过RxJava 1.x(因为它真的太棒了)。虽然可能熟悉1.x 的你可以直接扒文档就可以了,但这么大的变化,请原谅我还在这里瞎比比。

  • Nulls
    这是一个很大的变化,熟悉 RxJava 1.x 的童鞋一定都知道,1.x 是允许我们在发射事件的时候传入 null 值的,但现在我们的 2.x 不支持了,不信你试试? 大大的 NullPointerException 教你做人。这意味着 Observable<Void> 不再发射任何值,而是正常结束或者抛出空指针。
  • 2、Flowable
    在 RxJava 1.x 中关于介绍 backpressure 部分有一个小小的遗憾,那就是没有用一个单独的类,而是使用 Observable 。而在 2.x 中 Observable 不支持背压了,将用一个全新的 Flowable 来支持背压。
    或许对于背压,有些小伙伴们还不是特别理解,这里简单说一下。大概就是指在异步场景中,被观察者发送事件的速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。感兴趣的小伙伴可以模拟这种情况,在差距太大的时候,我们的内存会猛增,直到OOM。而我们的 Flowable 一定意义上可以解决这样的问题,但其实并不能完全解决,这个后面可能会提到。
  • Single/Completable/Maybe
    其实这三者都差不多,Single 顾名思义,只能发送一个事件,和 Observable接受可变参数完全不同。而 Completable 侧重于观察结果,而 Maybe 是上面两种的结合体。也就是说,当你只想要某个事件的结果(true or false)的时候,你可以使用这种观察者模式。
  • 线程调度相关
    这一块基本没什么改动,但细心的小伙伴一定会发现,RxJava 2.x 中已经没有了 Schedulers.immediate()这个线程环境,还有 Schedulers.test()
  • Function相关
    熟悉 1.x 的小伙伴一定都知道,我们在1.x 中是有 Func1Func2…..FuncN的,但 2.x 中将它们移除,而采用 Function 替换了 Func1,采用 BiFunction 替换了 Func 2..N。并且,它们都增加了 throws Exception,也就是说,妈妈再也不用担心我们做某些操作还需要 try-catch 了。
  • 其他操作符相关
    如 Func1...N 的变化,现在同样用 Consumer 和 BiConsumer 对 Action1 和 Action2 进行了替换。后面的 Action 都被替换了,只保留了 ActionN

附录

下面从官方截图展示 2.x 相对 1.x 的改动细节,仅供参考。

%title插图%num
%title插图%num
%title插图%num
%title插图%num
%title插图%num
%title插图%num
%title插图%num
%title插图%num