RxJava 简介

前言

提升开发效率,降低维护成本一直是开发团队永恒不变的宗旨。近一年来国内的技术圈子中越来越多的开始提及Rx,经过一段时间的学习和探索之后我也深深的感受到了RxJava的魅力。它能帮助我们简化代码逻辑,提升代码可读性。这对于开发效率的提升、后期维护成本的降低帮助都是巨大的。个人预测RxJava一定是2016年的一个大趋势,所以也有打算将它引入到公司现有的项目中来,写这一系列的文章主要也是为了团队内部做技术分享。

由于我本人是个Android程序猿,因此这一系列文章中的场景都是基于Android平台的。如果你是个Java Web工程师或者是其它方向的那也没关系,我会尽量用通俗的语言将问题描述清楚。

响应式编程

在介绍RxJava前,我们先聊聊响应式编程。那么什么是响应式编程呢?响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。

响应式编程的一个关键概念是事件。事件可以被等待,可以触发过程,也可以触发其它事件。事件是唯一的以合适的方式将我们的现实世界映射到我们的软件中:如果屋里太热了我们就打开一扇窗户。同样的,当我们的天气app从服务端获取到新的天气数据后,我们需要更新app上展示天气信息的UI;汽车上的车道偏移系统探测到车辆偏移了正常路线就会提醒驾驶者纠正,就是是响应事件。

今天,响应式编程*通用的一个场景是UI:我们的移动App必须做出对网络调用、用户触摸输入和系统弹框的响应。在这个世界上,软件之所以是事件驱动并响应的是因为现实生活也是如此。

本章节中部分概念摘自《RxJava Essentials》一书

RxJava的来历

Rx是微软.Net的一个响应式扩展,Rx借助可观测的序列提供一种简单的方式来创建异步的,基于事件驱动的程序。2012年Netflix为了应对不断增长的业务需求开始将.NET Rx迁移到JVM上面。并于13年二月份正式向外展示了RxJava。
从语义的角度来看,RxJava就是.NET Rx。从语法的角度来看,Netflix考虑到了对应每个Rx方法,保留了Java代码规范和基本的模式。

%title插图%num
RxJava来历

什么是RxJava

那么到底什么是RxJava呢?我对它的定义是:RxJava本质上是一个异步操作库,是一个能让你用*其简洁的逻辑去处理繁琐复杂任务的异步事件库。

RxJava好在哪

Android平台上为已经开发者提供了AsyncTask,Handler等用来做异步操作的类库,那我们为什么还要选择RxJava呢?答案是简洁!RxJava可以用非常简洁的代码逻辑来解决复杂问题;而且即使业务逻辑的越来越复杂,它依然能够保持简洁!再配合上Lambda用简单的几行代码分分钟就解决你负责的业务问题。简直逼格爆表,拿它装逼那是*好的!

多说无益,上代码!

假设我们安居客用户App上有个需求,需要从服务端拉取上海浦东新区塘桥板块的所有小区Community[] communities,每个小区下包含多套房源List<House> houses;我们需要把塘桥板块的所有总价大于500W的房源都展示在App的房源列表页。用于从服务端拉取communities需要发起网络请求,比较耗时,因此需要在后台运行。而这些房源信息需要展示到App的页面上,因此需要在UI线程上执行。(此例子思路来源于扔物线的给Android开发者的RxJava详解一文)

new Thread() {
        @Override
        public void run() {
            super.run();
            //从服务端获取小区列表
            List<Community> communities = getCommunitiesFromServer();
            for (Community community : communities) {
                List<House> houses = community.houses;
                for (House house : houses) {
                    if (house.price >= 5000000) {
                        runOnUiThread(new Runnable() {
                            @Override
                            public void run() {
                                //将房子的信息添加到屏幕上
                                addHouseInformationToScreen(house);
                            }
                        });
                    }
                }
            }
        }
    }.start();

使用RxJava的写法是这样的:

Observable.from(getCommunitiesFromServer())
            .flatMap(new Func1<Community, Observable<House>>() {
                @Override
                public Observable<House> call(Community community) {
                    return Observable.from(community.houses);
                }
            }).filter(new Func1<House, Boolean>() {
                @Override
                public Boolean call(House house) {
                    return house.price>=5000000;
                }
            }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<House>() {
                @Override
                public void call(House house) {
                    //将房子的信息添加到屏幕上
                    addHouseInformationToScreen(house);
                }
            });

从上面这段代码我们可以看到:虽然代码量看起来变复杂了,但是RxJava的实现是一条链式调用,没有任何的嵌套;整个实现逻辑看起来异常简洁清晰,这对我们的编程实现和后期维护是有巨大帮助的。特别是对于那些回调嵌套的场景。配合Lambda表达式还可以简化成这样:

  Observable.from(getCommunitiesFromServer())
            .flatMap(community -> Observable.from(community.houses))
            .filter(house -> house.price>=5000000).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::addHouseInformationToScreen);

简洁!有美感!这才是一个有情怀的程序员应该写出来的代码。

看完这篇文章大家应该能够理解RxJava为什么会越来越火了。它能*大的提高我们的开发效率和代码的可读性!当然了RxJava的学习曲线也是比较陡的,在后面的文章我会对主要的知识点做详细的介绍,敬请关注!

Android 开发者必看的 RxJava 详解

前言

我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 。而*近这几个月,我也发现国内越来越多的人开始提及 RxJava 。有人说『RxJava 真是太好用了』,有人说『RxJava 真是太难用了』,另外更多的人表示:我真的百度了也谷歌了,但我还是想问: RxJava 到底是什么?

鉴于 RxJava 目前这种既火爆又神秘的现状,而我又在一年的使用过程中对 RxJava 有了一些理解,我决定写下这篇文章来对 RxJava 做一个相对详细的、针对 Android 开发者的介绍。

这篇文章的目的有两个: 1. 给对 RxJava 感兴趣的人一些入门的指引 2. 给正在使用 RxJava 但仍然心存疑惑的人一些更深入的解析

  • RxJava 到底是什么
  • RxJava 好在哪
  • API 介绍和原理简析
    • 1. 概念:扩展的观察者模式
      • 观察者模式
      • RxJava 的观察者模式
    • 2. 基本实现
      • 1) 创建 Observer
      • 2) 创建 Observable
      • 3) Subscribe (订阅)
      • 4) 场景示例
        • a. 打印字符串数组
        • b. 由 id 取得图片并显示
    • 3. 线程控制 —— Scheduler (一)
      • 1) Scheduler 的 API (一)
      • 2) Scheduler 的原理 (一)
    • 4. 变换
      • 1) API
      • 2) 变换的原理:lift()
      • 3) compose: 对 Observable 整体的变换
    • 5. 线程控制:Scheduler (二)
      • 1) Scheduler 的 API (二)
      • 2) Scheduler 的原理(二)
      • 3) 延伸:doOnSubscribe()
  • RxJava 的适用场景和使用方式
    • 1. 与 Retrofit 的结合
    • 2. RxBinding
    • 3. 各种异步操作
    • 4. RxBus
  • *后
    • 关于作者:
    • 为什么写这个?

在正文开始之前的*后,放上 GitHub 链接和引入依赖的 gradle 代码: Github:
https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid
引入依赖:
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
(版本号是文章发布时的*新稳定版)

另外,感谢 RxJava 核心成员流火枫林的技术支持和内测读者代码家、鲍永章、drakeet、马琳、有时放纵、程序亦非猿、大头鬼、XZoomEye、席德雨、TCahead、Tiiime、Ailurus、宅学长、妖孽、大大大大大臣哥、NicodeLee的帮助,以及周伯通招聘的赞助。

RxJava 到底是什么

一个词:异步

RxJava 在 GitHub 主页上的自我介绍是 “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。

然而,对于初学者来说,这太难看懂了。因为它是一个『总结』,而初学者更需要一个『引言』。

其实, RxJava 的本质可以压缩为异步这一个词。说到根上,它就是一个实现异步操作的库,而别的定语都是基于这之上的。

RxJava 好在哪

换句话说,『同样是做异步,为什么人们用它,而不用现成的 AsyncTask / Handler / XXX / … ?』

一个词:简洁

异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 Android 创造的 AsyncTask 和Handler ,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。

举个例子

假设有这样一个需求:界面上有一个自定义的视图 imageCollectorView ,它的作用是显示多张图片,并能使用 addImage(Bitmap) 方法来任意增加显示的图片。现在需要程序将一个给出的目录数组 File[] folders 中每个目录下的 png 图片都加载出来并显示在 imageCollectorView 中。需要注意的是,由于读取图片的这一过程较为耗时,需要放在后台执行,而图片的显示则必须在 UI 线程执行。常用的实现方式有多种,我这里贴出其中一种:

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();

而如果使用 RxJava ,实现方式是这样的:

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });

那位说话了:『你这代码明明变多了啊!简洁个毛啊!』大兄弟你消消气,我说的是逻辑的简洁,不是单纯的代码量少(逻辑简洁才是提升读写代码速度的必杀技对不?)。观察一下你会发现, RxJava 的这个实现,是一条从上到下的链式调用,没有任何嵌套,这在逻辑的简洁性上是具有优势的。当需求变得复杂时,这种优势将更加明显(试想如果还要求只选取前 10 张图片,常规方式要怎么办?如果有更多这样那样的要求呢?再试想,在这一大堆需求实现完两个月之后需要改功能,当你翻回这里看到自己当初写下的那一片迷之缩进,你能保证自己将迅速看懂,而不是对着代码重新捋一遍思路?)。

另外,如果你的 IDE 是 Android Studio ,其实每次打开某个 Java 文件的时候,你会看到被自动 Lambda 化的预览,这将让你更加清晰地看到程序逻辑:

Observable.from(folders)
    .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
    .filter((Func1) (file) -> { file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

如果你习惯使用 Retrolambda ,你也可以直接把代码写成上面这种简洁的形式。而如果你看到这里还不知道什么是 Retrolambda ,我不建议你现在就去学习它。原因有两点:1. Lambda 是把双刃剑,它让你的代码简洁的同时,降低了代码的可读性,因此同时学习 RxJava 和 Retrolambda 可能会让你忽略 RxJava 的一些技术细节;2. Retrolambda 是 Java 6/7 对 Lambda 表达式的非官方兼容方案,它的向后兼容性和稳定性是无法保障的,因此对于企业项目,使用 Retrolambda 是有风险的。所以,与很多 RxJava 的推广者不同,我并不推荐在学习 RxJava 的同时一起学习 Retrolambda。事实上,我个人虽然很欣赏 Retrolambda,但我从来不用它。

在Flipboard 的 Android 代码中,有一段逻辑非常复杂,包含了多次内存操作、本地文件操作和网络操作,对象分分合合,线程间相互配合相互等待,一会儿排成人字,一会儿排成一字。如果使用常规的方法来实现,肯定是要写得欲仙欲死,然而在使用 RxJava 的情况下,依然只是一条链式调用就完成了。它很长,但很清晰。

所以, RxJava 好在哪?就好在简洁,好在那把什么复杂逻辑都能穿成一条线的简洁。

API 介绍和原理简析

这个我就做不到一个词说明了……因为这一节的主要内容就是一步步地说明 RxJava 到底怎样做到了异步,怎样做到了简洁。

1. 概念:扩展的观察者模式

RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。

观察者模式

先简述一下观察者模式,已经熟悉的可以跳过这一段。

观察者模式面向的需求是:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。举个例子,新闻里喜闻乐见的警察抓小偷,警察需要在小偷伸手作案的时候实施抓捕。在这个例子里,警察是观察者,小偷是被观察者,警察需要时刻盯着小偷的一举一动,才能保证不会漏过任何瞬间。程序的观察者模式和这种真正的『观察』略有不同,观察者不需要时刻盯着被观察者(例如 A 不需要每过 2ms 就检查一次 B 的状态),而是采用注册(Register)或者称为订阅(Subscribe)的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我。 Android 开发中一个比较典型的例子是点击监听器 OnClickListener 。对设置 OnClickListener 来说, View 是被观察者, OnClickListener 是观察者,二者通过 setOnClickListener() 方法达成订阅关系。订阅之后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的 OnClickListener 。采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到*高的反馈速度。当然,这也得益于我们可以随意定制自己程序中的观察者和被观察者,而警察叔叔明显无法要求小偷『你在作案的时候务必通知我』。

OnClickListener 的模式大致如下图:

OnClickListener 观察者模式

如图所示,通过 setOnClickListener() 方法,Button 持有 OnClickListener 的引用(这一过程没有在图上画出);当用户点击时,Button 自动调用 OnClickListener 的 onClick() 方法。另外,如果把这张图中的概念抽象出来(Button -> 被观察者、OnClickListener -> 观察者、setOnClickListener() -> 订阅,onClick() -> 事件),就由专用的观察者模式(例如只用于监听控件点击)转变成了通用的观察者模式。如下图:

通用观察者模式

而 RxJava 作为一个工具库,使用的就是通用形式的观察者模式。

RxJava 的观察者模式

RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer

与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()

  • onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
  • onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
  • 在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的*后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

RxJava 的观察者模式大致如下图:

RxJava 的观察者模式

2. 基本实现

基于以上的概念, RxJava 的基本实现主要有三点:

1) 创建 Observer

Observer 即观察者,它决定事件触发的时候将有怎样的行为。 RxJava 中的 Observer 接口的实现方式:

Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的:

Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

不仅基本使用方式一样,实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以如果你只想使用基本功能,选择 Observer 和 Subscriber 是完全一样的。它们的区别对于使用者来说主要有两点:

  1. onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。
  2. unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以*好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。
2) 创建 Observable

Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。 RxJava 使用 create() 方法来创建一个 Observable ,并为它定义事件触发规则:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用三次 onNext() 和一次 onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。

这个例子很简单:事件的内容是字符串,而不是一些复杂的对象;事件的内容是已经定好了的,而不像有的观察者模式一样是待确定的(例如网络请求的结果在请求返回之前是未知的);所有事件在一瞬间被全部发送出去,而不是夹杂一些确定或不确定的时间间隔或者经过某种触发器来触发的。总之,这个例子看起来毫无实用价值。但这是为了便于说明,实质上只要你想,各种各样的事件发送规则你都可以自己来写。至于具体怎么做,后面都会讲到,但现在不行。只有把基础原理先说明白了,上层的运用才能更容易说清楚。

create() 方法是 RxJava *基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列,例如:

  • just(T...): 将传入的参数依次发送出来。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
  • from(T[]) / from(Iterable<? extends T>) : 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();

上面 just(T...) 的例子和 from(T[]) 的例子,都和之前的 create(OnSubscribe) 的例子是等价的。

3) Subscribe (订阅)

创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了。代码形式很简单:

observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

有人可能会注意到, subscribe() 这个方法有点怪:它看起来是『observalbe 订阅了 observer / subscriber』而不是『observer / subscriber 订阅了 observalbe』,这看起来就像『杂志订阅了读者』一样颠倒了对象关系。这让人读起来有点别扭,不过如果把 API 设计成 observer.subscribe(observable) / subscriber.subscribe(observable) ,虽然更加符合思维逻辑,但对流式 API 的设计就造成影响了,比较起来明显是得不偿失的。

Observable.subscribe(Subscriber) 的内部实现是这样的(仅核心代码):

// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}

可以看到,subscriber() 做了3件事:

  1. 调用 Subscriber.onStart() 。这个方法在前面已经介绍过,是一个可选的准备方法。
  2. 调用 Observable 中的 OnSubscribe.call(Subscriber) 。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中, Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候。
  3. 将传入的 Subscriber 作为 Subscription 返回。这是为了方便 unsubscribe().

整个过程中对象间的关系如下图:

关系静图

或者可以看动图:

关系静图

除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber 。形式如下:

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

简单解释一下这段代码中出现的 Action1 和 Action0。 Action0 是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的;由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。这样其实也可以看做将 onCompleted() 方法作为参数传进了 subscribe(),相当于其他某些语言中的『闭包』。 Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。事实上,虽然 Action0 和 Action1 在 API 中使用*广泛,但 RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2Action3) 的,它们可以被用以包装不同的无返回值的方法。

注:正如前面所提到的,Observer 和 Subscriber 具有相同的角色,而且 Observer 在 subscribe() 过程中*终会被转换成 Subscriber 对象,因此,从这里开始,后面的描述我将用 Subscriber 来代替 Observer ,这样更加严谨。

4) 场景示例

下面举两个例子:

为了把原理用更清晰的方式表述出来,本文中挑选的都是功能尽可能简单的例子,以至于有些示例代码看起来会有『画蛇添足』『明明不用 RxJava 可以更简便地解决问题』的感觉。当你看到这种情况,不要觉得是因为 RxJava 太啰嗦,而是因为在过早的时候举出真实场景的例子并不利于原理的解析,因此我刻意挑选了简单的情景。

a. 打印字符串数组

将字符串数组 names 中的所有字符串依次打印出来:

String[] names = ...;
Observable.from(names)
    .subscribe(new Action1<String>() {
        @Override
        public void call(String name) {
            Log.d(tag, name);
        }
    });
b. 由 id 取得图片并显示

由指定的一个 drawable 文件 id drawableRes 取得图片,并显示在 ImageView 中,并在出现异常的时候打印 Toast 报错:

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
}).subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});

正如上面两个例子这样,创建出 Observable 和 Subscriber ,再用 subscribe() 将它们串起来,一次 RxJava 的基本使用就完成了。非常简单。

然而,

这并没有什么diao用

在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的。也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。而要实现异步,则需要用到 RxJava 的另一个概念: Scheduler 。

3. 线程控制 —— Scheduler (一)

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

1) Scheduler 的 API (一)

在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。 * subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 * observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

文字叙述总归难理解,上代码:

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });

上面这段代码中,由于 subscribeOn(Schedulers.io()) 的指定,被创建的事件的内容 1234 将会在 IO 线程发出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 数字的打印将发生在主线程 。事实上,这种在 subscribe() 之前写上两句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。

而前面提到的由图片 id 取得图片并显示的例子,如果也加上这两句:

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});

那么,加载图片将会发生在 IO 线程,而设置图片则被设定在了主线程。这就意味着,即使加载图片耗费了几十甚至几百毫秒的时间,也不会造成丝毫界面的卡顿。

2) Scheduler 的原理 (一)

RxJava 的 Scheduler API 很方便,也很神奇(加了一句话就把线程切换了,怎么做到的?而且 subscribe() 不是*外层直接调用的方法吗,它竟然也能被指定线程?)。然而 Scheduler 的原理需要放在后面讲,因为它的原理是以下一节《变换》的原理作为基础的。

好吧这一节其实我屁也没说,只是为了让你安心,让你知道我不是忘了讲原理,而是把它放在了更合适的地方。

4. 变换

终于要到牛逼的地方了,不管你激动不激动,反正我是激动了。

RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说『RxJava 真是太好用了』的*大原因。所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。概念说着总是模糊难懂的,来看 API。

1) API

首先看一个 map() 的例子:

Observable.just("images/logo.png") // 输入类型 String
    .map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String filePath) { // 参数类型 String
            return getBitmapFromPath(filePath); // 返回类型 Bitmap
        }
    })
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) { // 参数类型 Bitmap
            showBitmap(bitmap);
        }
    });

这里出现了一个叫做 Func1 的类。它和 Action1 非常相似,也是 RxJava 的一个接口,用于包装含有一个参数的方法。 Func1 和 Action 的区别在于, Func1 包装的是有返回值的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。

可以看到,map() 方法将参数中的 String 对象转换成一个 Bitmap 对象后返回,而在经过 map() 方法后,事件的参数类型也由 String 转为了 Bitmap。这种直接变换对象并返回的,是*常见的也*容易理解的变换。不过 RxJava 的变换远不止这样,它不仅可以针对事件对象,还可以针对整个事件队列,这使得 RxJava 变得非常灵活。我列举几个常用的变换:

  • map(): 事件对象的直接变换,具体功能上面已经介绍过。它是 RxJava *常用的变换。 map() 的示意图: map() 示意图
  • flatMap(): 这是一个很有用但非常难理解的变换,因此我决定花多些篇幅来介绍它。 首先假设这么一种需求:假设有一个数据结构『学生』,现在需要打印出一组学生的名字。实现方式很简单:
Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String name) {
        Log.d(tag, name);
    }
    ...
};
Observable.from(students)
    .map(new Func1<Student, String>() {
        @Override
        public String call(Student student) {
            return student.getName();
        }
    })
    .subscribe(subscriber);

很简单。那么再假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程。)首先可以这样实现:

Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
    @Override
    public void onNext(Student student) {
        List<Course> courses = student.getCourses();
        for (int i = 0; i < courses.size(); i++) {
            Course course = courses.get(i);
            Log.d(tag, course.getName());
        }
    }
    ...
};
Observable.from(students)
    .subscribe(subscriber);

依然很简单。那么如果我不想在 Subscriber 中使用 for 循环,而是希望 Subscriber 中直接传入单个的 Course对象呢(这对于代码复用很重要)?用 map() 显然是不行的,因为 map() 是一对一的转化,而我现在的要求是一对多的转化。那怎么才能把一个 Student 转化成多个 Course 呢?

这个时候,就需要用 flatMap() 了:

Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);

从上面的代码可以看出, flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。 flatMap() 的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。

flatMap() 示意图:

flatMap() 示意图

扩展:由于可以在嵌套的 Observable 中添加异步代码, flatMap() 也常用于嵌套的异步操作,例如嵌套的网络请求。示例代码(Retrofit + RxJava):

networkClient.token() // 返回 Observable<String>,在订阅时请求 token,并在响应后发送 token
    .flatMap(new Func1<String, Observable<Messages>>() {
        @Override
        public Observable<Messages> call(String token) {
            // 返回 Observable<Messages>,在订阅时请求消息列表,并在响应后发送请求到的消息列表
            return networkClient.messages();
        }
    })
    .subscribe(new Action1<Messages>() {
        @Override
        public void call(Messages messages) {
            // 处理显示消息列表
            showMessages(messages);
        }
    });

传统的嵌套请求需要使用嵌套的 Callback 来实现。而通过 flatMap() ,可以把嵌套的请求写在一条链中,从而保持程序逻辑的清晰。

  • throttleFirst(): 在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤,例如按钮的点击监听器: RxView.clickEvents(button) // RxBinding 代码,后面的文章有解释 .throttleFirst(500, TimeUnit.MILLISECONDS) // 设置防抖间隔为 500ms .subscribe(subscriber); 妈妈再也不怕我的用户手抖点开两个重复的界面啦。

此外, RxJava 还提供很多便捷的方法来实现事件序列的变换,这里就不一一举例了。

2) 变换的原理:lift()

这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下 lift() 的内部实现(仅核心代码):

// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);
        }
    });
}

这段代码很有意思:它生成了一个新的 Observable 并返回,而且创建新 Observable 所用的参数 OnSubscribe 的回调方法 call() 中的实现竟然看起来和前面讲过的 Observable.subscribe() 一样!然而它们并不一样哟~不一样的地方关键就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的对象不同(高能预警:接下来的几句话可能会导致身体的严重不适)——

  • subscribe() 中这句话的 onSubscribe 指的是 Observable 中的 onSubscribe 对象,这个没有问题,但是 lift() 之后的情况就复杂了点。
  • 当含有 lift() 时:
    1.lift() 创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;
    2.而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe
    3.当用户调用经过 lift() 后的 Observable 的 subscribe() 的时候,使用的是 lift() 所返回的新的 Observable ,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe
    4.而这个新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的 SubscriberOperator 就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的『变换』代码以实现变换),然后利用这个新 Subscriber 向原始 Observable 进行订阅。
    这样就实现了 lift() 过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。

精简掉细节的话,也可以这么说:在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber

如果你更喜欢具象思维,可以看图:

lift() 原理图

或者可以看动图:

lift 原理动图

两次和多次的 lift() 同理,如下图:

两次 lift

举一个具体的 Operator 的实现。下面这是一个将事件中的 Integer 对象转换成 String 的例子,仅供参考:

observable.lift(new Observable.Operator<String, Integer>() {
    @Override
    public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
        // 将事件序列中的 Integer 对象转换为 String 对象
        return new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                subscriber.onNext("" + integer);
            }

            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }
        };
    }
});

讲述 lift() 的原理只是为了让你更好地了解 RxJava ,从而可以更好地使用它。然而不管你是否理解了 lift() 的原理,RxJava 都不建议开发者自定义 Operator 来直接使用 lift(),而是建议尽量使用已有的 lift() 包装方法(如 map() flatMap() 等)进行组合来实现需求,因为直接使用 lift() 非常容易发生一些难以发现的错误。

3) compose: 对 Observable 整体的变换

除了 lift() 之外, Observable 还有一个变换方法叫做 compose(Transformer)。它和 lift() 的区别在于, lift() 是针对事件项和事件序列的,而 compose() 是针对 Observable 自身进行变换。举个例子,假设在程序中有多个 Observable ,并且他们都需要应用一组相同的 lift() 变换。你可以这么写:

observable1
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);
observable2
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber2);
observable3
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber3);
observable4
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);

你觉得这样太不软件工程了,于是你改成了这样:

private Observable liftAll(Observable observable) {
    return observable
        .lift1()
        .lift2()
        .lift3()
        .lift4();
}
...
liftAll(observable1).subscribe(subscriber1);
liftAll(observable2).subscribe(subscriber2);
liftAll(observable3).subscribe(subscriber3);
liftAll(observable4).subscribe(subscriber4);

可读性、可维护性都提高了。可是 Observable 被一个方法包起来,这种方式对于 Observale 的灵活性似乎还是增添了那么点限制。怎么办?这个时候,就应该用 compose() 来解决了:

public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
    @Override
    public Observable<String> call(Observable<Integer> observable) {
        return observable
            .lift1()
            .lift2()
            .lift3()
            .lift4();
    }
}
...
Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);

像上面这样,使用 compose() 方法,Observable 可以利用传入的 Transformer 对象的 call 方法直接对自身进行处理,也就不必被包在方法的里面了。

compose() 的原理比较简单,不附图喽。

5. 线程控制:Scheduler (二)

除了灵活的变换,RxJava 另一个牛逼的地方,就是线程的自由控制。

1) Scheduler 的 API (二)

前面讲到了,可以利用 subscribeOn() 结合 observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。可是在了解了 map() flatMap() 等变换方法后,有些好事的(其实就是当初刚接触 RxJava 时的我)就问了:能不能多切换几次线程?

答案是:能。因为 observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。上代码:

Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新线程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 线程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定

如上,通过 observeOn() 的多次调用,程序实现了线程的多次切换。

不过,不同于 observeOn() , subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。

又有好事的(其实还是当初的我)问了:如果我非要调用多次 subscribeOn() 呢?会有什么效果?

这个问题先放着,我们还是从 RxJava 线程控制的原理说起吧。

2) Scheduler 的原理(二)

其实, subscribeOn() 和 observeOn() 的内部实现,也是用的 lift()。具体看图(不同颜色的箭头表示不同的线程):

subscribeOn() 原理图:

subscribeOn() 原理

observeOn() 原理图:

observeOn() 原理

从图中可以看出,subscribeOn() 和 observeOn() 都做了线程切换的工作(图中的 “schedule…” 部位)。不同的是, subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程。

*后,我用一张图来解释当多个 subscribeOn() 和 observeOn() 混合使用时,线程调度是怎么发生的(由于图中对象较多,相对于上面的图对结构做了一些简化调整):

线程控制综合调用

图中共有 5 处含有对事件的操作。由图中可以看出,①和②两处受*个 subscribeOn() 影响,运行在红色线程;③和④处受*个 observeOn() 的影响,运行在绿色线程;⑤处受第二个 onserveOn() 影响,运行在紫色线程;而第二个 subscribeOn() ,由于在通知过程中线程就被*个 subscribeOn() 截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个 subscribeOn() 的时候,只有*个 subscribeOn() 起作用。

3) 延伸:doOnSubscribe()

然而,虽然超过一个的 subscribeOn() 对事件处理的流程没有影响,但在流程之前却是可以利用的。

在前面讲 Subscriber 的时候,提到过 Subscriber 的 onStart() 可以用作流程开始前的初始化。然而 onStart()由于在 subscribe() 发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe() 被调用时的线程。这就导致如果 onStart() 中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测 subscribe() 将会在什么线程执行。

而与 Subscriber.onStart() 相对应的,有一个方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它*近的 subscribeOn() 所指定的线程。

示例代码:

Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

如上,在 doOnSubscribe()的后面跟一个 subscribeOn() ,就能指定准备工作的线程了。

RxJava 的适用场景和使用方式

1. 与 Retrofit 的结合

Retrofit 是 Square 的一个著名的网络请求库。没有用过 Retrofit 的可以选择跳过这一小节也没关系,我举的每种场景都只是个例子,而且例子之间并无前后关联,只是个抛砖引玉的作用,所以你跳过这里看别的场景也可以的。

Retrofit 除了提供了传统的 Callback 形式的 API,还有 RxJava 版本的 Observable 形式 API。下面我用对比的方式来介绍 Retrofit 的 RxJava 版 API 和传统版本的区别。

以获取一个 User 对象的接口作为例子。使用Retrofit 的传统 API,你可以用这样的方式来定义请求:

@GET("/user")
public void getUser(@Query("userId") String userId, Callback<User> callback);

在程序的构建过程中, Retrofit 会把自动把方法实现并生成代码,然后开发者就可以利用下面的方法来获取特定用户并处理响应:

getUser(userId, new Callback<User>() {
    @Override
    public void success(User user) {
        userView.setUser(user);
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
};

而使用 RxJava 形式的 API,定义同样的请求是这样的:

@GET("/user")
public Observable<User> getUser(@Query("userId") String userId);

使用的时候是这样的:

getUser(userId)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });

看到区别了吗?

当 RxJava 形式的时候,Retrofit 把请求封装进 Observable ,在请求结束后调用 onNext() 或在请求失败后调用 onError()

对比来看, Callback 形式和 Observable 形式长得不太一样,但本质都差不多,而且在细节上 Observable 形式似乎还比 Callback 形式要差点。那 Retrofit 为什么还要提供 RxJava 的支持呢?

因为它好用啊!从这个例子看不出来是因为这只是*简单的情况。而一旦情景复杂起来, Callback 形式马上就会开始让人头疼。比如:

假设这么一种情况:你的程序取到的 User 并不应该直接显示,而是需要先与数据库中的数据进行比对和修正后再显示。使用 Callback 方式大概可以这么写:

getUser(userId, new Callback<User>() {
    @Override
    public void success(User user) {
        processUser(user); // 尝试修正 User 数据
        userView.setUser(user);
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
};

有问题吗?

很简便,但不要这样做。为什么?因为这样做会影响性能。数据库的操作很重,一次读写操作花费 10~20ms 是很常见的,这样的耗时很容易造成界面的卡顿。所以通常情况下,如果可以的话一定要避免在主线程中处理数据库。所以为了提升性能,这段代码可以优化一下:

getUser(userId, new Callback<User>() {
    @Override
    public void success(User user) {
        new Thread() {
            @Override
            public void run() {
                processUser(user); // 尝试修正 User 数据
                runOnUiThread(new Runnable() { // 切回 UI 线程
                    @Override
                    public void run() {
                        userView.setUser(user);
                    }
                });
            }).start();
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
};

性能问题解决,但……这代码实在是太乱了,迷之缩进啊!杂乱的代码往往不仅仅是美观问题,因为代码越乱往往就越难读懂,而如果项目中充斥着杂乱的代码,无疑会降低代码的可读性,造成团队开发效率的降低和出错率的升高。

这时候,如果用 RxJava 的形式,就好办多了。 RxJava 形式的代码是这样的:

getUser(userId)
    .doOnNext(new Action1<User>() {
        @Override
        public void call(User user) {
            processUser(user);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });

后台代码和前台代码全都写在一条链中,明显清晰了很多。

再举一个例子:假设 /user 接口并不能直接访问,而需要填入一个在线获取的 token ,代码应该怎么写?

Callback 方式,可以使用嵌套的 Callback

@GET("/token")
public void getToken(Callback<String> callback);

@GET("/user")
public void getUser(@Query("token") String token, @Query("userId") String userId, Callback<User> callback);

...

getToken(new Callback<String>() {
    @Override
    public void success(String token) {
        getUser(token, userId, new Callback<User>() {
            @Override
            public void success(User user) {
                userView.setUser(user);
            }

            @Override
            public void failure(RetrofitError error) {
                // Error handling
                ...
            }
        };
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
});

倒是没有什么性能问题,可是迷之缩进毁一生,你懂我也懂,做过大项目的人应该更懂。

而使用 RxJava 的话,代码是这样的:

@GET("/token")
public Observable<String> getToken();

@GET("/user")
public Observable<User> getUser(@Query("token") String token, @Query("userId") String userId);

...

getToken()
    .flatMap(new Func1<String, Observable<User>>() {
        @Override
        public Observable<User> onNext(String token) {
            return getUser(token, userId);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });

用一个 flatMap() 就搞定了逻辑,依然是一条链。看着就很爽,是吧?

2016/03/31 更新,加上我写的一个 Sample 项目:
rengwuxian RxJava Samples

好,Retrofit 部分就到这里。

2. RxBinding

RxBinding 是 Jake Wharton 的一个开源库,它提供了一套在 Android 平台上的基于 RxJava 的 Binding API。所谓 Binding,就是类似设置 OnClickListener 、设置 TextWatcher 这样的注册绑定对象的 API。

举个设置点击监听的例子。使用 RxBinding ,可以把事件监听用这样的方法来设置:

Button button = ...;
RxView.clickEvents(button) // 以 Observable 形式来反馈点击事件
    .subscribe(new Action1<ViewClickEvent>() {
        @Override
        public void call(ViewClickEvent event) {
            // Click handling
        }
    });

看起来除了形式变了没什么区别,实质上也是这样。甚至如果你看一下它的源码,你会发现它连实现都没什么惊喜:它的内部是直接用一个包裹着的 setOnClickListener() 来实现的。然而,仅仅这一个形式的改变,却恰好就是 RxBinding 的目的:扩展性。通过 RxBinding 把点击监听转换成 Observable 之后,就有了对它进行扩展的可能。扩展的方式有很多,根据需求而定。一个例子是前面提到过的 throttleFirst() ,用于去抖动,也就是消除手抖导致的快速连环点击:

RxView.clickEvents(button)
    .throttleFirst(500, TimeUnit.MILLISECONDS)
    .subscribe(clickAction);

如果想对 RxBinding 有更多了解,可以去它的 GitHub 项目 下面看看。

3. 各种异步操作

前面举的 Retrofit 和 RxBinding 的例子,是两个可以提供现成的 Observable 的库。而如果你有某些异步操作无法用这些库来自动生成 Observable,也完全可以自己写。例如数据库的读写、大图片的载入、文件压缩/解压等各种需要放在后台工作的耗时操作,都可以用 RxJava 来实现,有了之前几章的例子,这里应该不用再举例了。

4. RxBus

RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用 Otto 或者 GreenRobot 的 EventBus。至于什么是 RxBus,可以看这篇文章。顺便说一句,Flipboard 已经用 RxBus 替换掉了 Otto ,目前为止没有不良反应。

*后

对于 Android 开发者来说, RxJava 是一个很难上手的库,因为它对于 Android 开发者来说有太多陌生的概念了。可是它真的很牛逼。因此,我写了这篇《给 Android 开发者的 RxJava 详解》,希望能给始终搞不明白什么是 RxJava 的人一些入门的指引,或者能让正在使用 RxJava 但仍然心存疑惑的人看到一些更深入的解析。无论如何,只要能给各位同为 Android 工程师的你们提供一些帮助,这篇文章的目的就达到了。

 

为什么写这个?

与两三年前的境况不同,中国现在已经不缺初级 Android 工程师,但中级和高级工程师严重供不应求。因此我决定从今天开始不定期地发布我的技术分享,只希望能够和大家共同提升,通过我们的成长来解决一点点国内互联网公司人才稀缺的困境,也提升各位技术党的收入。所以,不仅要写这篇,我还会写更多。至于内容的定位,我计划只定位真正的干货,一些边边角角的小技巧和炫酷的黑科技应该都不会写,总之希望每篇文章都能帮读者提升真正的实力。

深入浅出RxJava这一篇就够了

前言:

*次接触RxJava是在前不久,一个新Android项目的启动,在评估时选择了RxJava。RxJava是一个基于事件订阅的异步执行的一个类库。听起来有点复杂,其实是要你使用过一次,就会大概明白它是怎么回事了!为是什么一个Android项目启动会联系到RxJava呢?因为在RxJava使用起来得到广泛的认可,又是基于Java语言的。自然会有善于组织和总结的开发者联想到Android!没错,RxAndroid就这样在RxJava的基础上,针对Android开发的一个库。今天我们主要是来讲解一下RxJava,在接下来的几篇博客中我会陆续带大家来认识RxAndroid,Retrofit框架的使用,这些都是目前比较火的一些技术框架!

这里是Github上RxJava的项目地址:https://github.com/ReactiveX/RxJava

技术文档Api:http://reactivex.io/RxJava/javadoc/

 

官方的介绍

1.支持Java6+

2.android 2.3+

3.异步的

4.基于观察者设计模式(Observer、Observable)不懂设计模式的可以移步到此:浅谈Java设计模式(十五)观察者模式(Observer)

5.Subscribe (订阅)

 

正式使用RxJava

用框架或者库都是为了简洁、方便,RxJava也不例外它能使你的代码逻辑更加的简洁。举个例子之前我们先来引入依赖的 gradle 代码:

[java] view plain copy
  1. compile ‘io.reactivex:rxjava:1.0.14’   
  2. compile ‘io.reactivex:rxandroid:1.0.1’   

既然是基于异步,当然要在处理比较耗时的操作上才能彰显它的优势!现在我们假设有这样一个需求:
需要实现一个多个下载的图片并且显示的功能,它的作用可以添加多个下载操作,由于下载这一过程较为耗时,需要放在后台执行,而图片的显示则必须在 UI 线程执行。常用的实现方式有多种,我这里贴出其中一种:

[java] view plain copy
  1. new Thread() {  
  2.     @Override  
  3.     public void run() {  
  4.         super.run();  
  5.         for (File folder : folders) {  
  6.             File[] files = folder.listFiles();
  7.             for (File file : files) {  
  8.                 if (file.getName().endsWith(“.png”)) {  
  9.                     final Bitmap bitmap = getBitmapFromFile(file);  
  10.                     getActivity().runOnUiThread(new Runnable() {  
  11.                         @Override  
  12.                         public void run() {  
  13.                             imageCollectorView.addImage(bitmap);
  14.                         }
  15.                     });
  16.                 }
  17.             }
  18.         }
  19.     }
  20. }.start();

里面的判断是不是看起来有点晕晕,当然这是我自己写的,我一眼就能看清楚里面的逻辑,但是如果换做是别人来阅读你的代码,这就比较的尴尬了!
我们来看看使用RxJava的代码:

[java] view plain copy
  1. Observable.from(folders)
  2.     .flatMap(new Func1<File, Observable<File>>() {  
  3.         @Override  
  4.         public Observable<File> call(File file) {  
  5.             return Observable.from(file.listFiles());  
  6.         }
  7.     })
  8.     .filter(new Func1<File, Boolean>() {  
  9.         @Override  
  10.         public Boolean call(File file) {  
  11.             return file.getName().endsWith(“.png”);  
  12.         }
  13.     })
  14.     .map(new Func1<File, Bitmap>() {  
  15.         @Override  
  16.         public Bitmap call(File file) {  
  17.             return getBitmapFromFile(file);  
  18.         }
  19.     })
  20.     .subscribeOn(Schedulers.io())
  21.     .observeOn(AndroidSchedulers.mainThread())
  22.     .subscribe(new Action1<Bitmap>() {  
  23.         @Override  
  24.         public void call(Bitmap bitmap) {  
  25.             imageCollectorView.addImage(bitmap);
  26.         }
  27.     });

是不是明了,虽然说算不上简单,但是习惯了就一如既往了!

如果你使用的AndroidStudio的话,你打开Java文件的时候,你会看到被自动 Lambda 化的预览,这将让你更加清晰地看到程序逻辑:

[java] view plain copy
  1. Observable.from(folders)
  2.     .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
  3.     .filter((Func1) (file) -> { file.getName().endsWith(“.png”) })  
  4.     .map((Func1) (file) -> { getBitmapFromFile(file) })
  5.     .subscribeOn(Schedulers.io())
  6.     .observeOn(AndroidSchedulers.mainThread())
  7.     .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

不过如果你对Java8还不是很了解的话呢这一段可以暂时忽略,但是你可以移步到这里了解一下Java8:Java8部分新特性介绍

看完代码,是不是有种相见恨晚的冲动?别急,我们来慢慢了解RxJava!

 

前面已经提到他是基于Java观察者设计模式的,这个模式上面有给大家链接,可以去看看,这里不不坐过多的介绍,我们来介绍一下RxJava中的观察者模式:
RxJava 的观察者模式

一、说明
1)RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
2)与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
3)onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
4)onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
5)在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的*后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

二、实现
1) 创建 Observer
Observer 即观察者,它决定事件触发的时候将有怎样的行为。 RxJava 中的 Observer 接口的实现方式:

[java] view plain copy
  1. Observer<String> observer = new Observer<String>() {  
  2.     @Override  
  3.     public void onNext(String s) {  
  4.         Log.d(tag, “Item: ” + s);  
  5.     }
  6.     @Override  
  7.     public void onCompleted() {  
  8.         Log.d(tag, “Completed!”);  
  9.     }
  10.     @Override  
  11.     public void onError(Throwable e) {  
  12.         Log.d(tag, “Error!”);  
  13.     }
  14. };

除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的:

[java] view plain copy
  1. Subscriber<String> subscriber = new Subscriber<String>() {  
  2.     @Override  
  3.     public void onNext(String s) {  
  4.         Log.d(tag, “Item: ” + s);  
  5.     }
  6.     @Override  
  7.     public void onCompleted() {  
  8.         Log.d(tag, “Completed!”);  
  9.     }
  10.     @Override  
  11.     public void onError(Throwable e) {  
  12.         Log.d(tag, “Error!”);  
  13.     }
  14. };

不仅基本使用方式一样,实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以如果你只想使用基本功能,选择 Observer 和 Subscriber 是完全一样的。它们的区别对于使用者来说主要有两点:
onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。
unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以*好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。

2) 创建 Observable
Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。 RxJava 使用 create() 方法来创建一个 Observable ,并为它定义事件触发规则:

[java] view plain copy
  1. Observable observable = Observable.create(new Observable.OnSubscribe<String>() {  
  2.     @Override  
  3.     public void call(Subscriber<? super String> subscriber) {  
  4.         subscriber.onNext(“Hello”);  
  5.         subscriber.onNext(“Hi”);  
  6.         subscriber.onNext(“Aloha”);  
  7.         subscriber.onCompleted();
  8.     }
  9. });

可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用三次 onNext() 和一次 onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式

create() 方法是 RxJava *基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列,例如:
just(T…): 将传入的参数依次发送出来。

[java] view plain copy
  1. Observable observable = Observable.just(“Hello”, “Hi”, “Aloha”);  
  2. // 将会依次调用:  
  3. // onNext(“Hello”);  
  4. // onNext(“Hi”);  
  5. // onNext(“Aloha”);  
  6. // onCompleted();  

from(T[]) / from(Iterable<? extends T>) : 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。

[java] view plain copy
  1. String[] words = {“Hello”, “Hi”, “Aloha”};  
  2. Observable observable = Observable.from(words);
  3. // 将会依次调用:  
  4. // onNext(“Hello”);  
  5. // onNext(“Hi”);  
  6. // onNext(“Aloha”);  
  7. // onCompleted();  

上面 just(T…) 的例子和 from(T[]) 的例子,都和之前的 create(OnSubscribe) 的例子是等价的。

3) Subscribe (订阅)
创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了。代码形式很简单:

[java] view plain copy
  1. observable.subscribe(observer);
  2. // 或者:  
  3. observable.subscribe(subscriber);

Observable.subscribe(Subscriber) 的内部实现是这样的(仅核心代码):

[java] view plain copy
  1. // 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。  
  2. // 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。  
  3. public Subscription subscribe(Subscriber subscriber) {  
  4.     subscriber.onStart();
  5.     onSubscribe.call(subscriber);
  6.     return subscriber;  
  7. }

可以看到,subscriber() 做了3件事:
1.调用 Subscriber.onStart() 。这个方法在前面已经介绍过,是一个可选的准备方法。
2.调用 Observable 中的 OnSubscribe.call(Subscriber) 。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中, Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候。
3.将传入的 Subscriber 作为 Subscription 返回。这是为了方便 unsubscribe().

除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber 。形式如下:

[java] view plain copy
  1. Action1<String> onNextAction = new Action1<String>() {  
  2.     // onNext()  
  3.     @Override  
  4.     public void call(String s) {  
  5.         Log.d(tag, s);
  6.     }
  7. };
  8. Action1<Throwable> onErrorAction = new Action1<Throwable>() {  
  9.     // onError()  
  10.     @Override  
  11.     public void call(Throwable throwable) {  
  12.         // Error handling  
  13.     }
  14. };
  15. Action0 onCompletedAction = new Action0() {  
  16.     // onCompleted()  
  17.     @Override  
  18.     public void call() {  
  19.         Log.d(tag, “completed”);  
  20.     }
  21. };
  22. // 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()  
  23. observable.subscribe(onNextAction);
  24. // 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()  
  25. observable.subscribe(onNextAction, onErrorAction);
  26. // 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()  
  27. observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

简单解释一下这段代码中出现的 Action1 和 Action0。 Action0 是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的;由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。这样其实也可以看做将 onCompleted() 方法作为参数传进了 subscribe(),相当于其他某些语言中的『闭包』。 Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。事实上,虽然 Action0 和 Action1 在 API 中使用*广泛,但 RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2, Action3) 的,它们可以被用以包装不同的无返回值的方法。

4) 场景示例

下面举两个例子:
a. 打印字符串数组
将字符串数组 names 中的所有字符串依次打印出来:

[java] view plain copy
  1. String[] names = …;
  2. Observable.from(names)
  3.     .subscribe(new Action1<String>() {  
  4.         @Override  
  5.         public void call(String name) {  
  6.             Log.d(tag, name);
  7.         }
  8.     });

b. 由 id 取得图片并显示
由指定的一个 drawable 文件 id drawableRes 取得图片,并显示在 ImageView 中,并在出现异常的时候打印 Toast 报错:

[java] view plain copy
  1. int drawableRes = …;  
  2. ImageView imageView = …;
  3. Observable.create(new OnSubscribe<Drawable>() {  
  4.     @Override  
  5.     public void call(Subscriber<? super Drawable> subscriber) {  
  6.         Drawable drawable = getTheme().getDrawable(drawableRes));
  7.         subscriber.onNext(drawable);
  8.         subscriber.onCompleted();
  9.     }
  10. }).subscribe(new Observer<Drawable>() {  
  11.     @Override  
  12.     public void onNext(Drawable drawable) {  
  13.         imageView.setImageDrawable(drawable);
  14.     }
  15.     @Override  
  16.     public void onCompleted() {  
  17.     }
  18.     @Override  
  19.     public void onError(Throwable e) {  
  20.         Toast.makeText(activity, “Error!”, Toast.LENGTH_SHORT).show();  
  21.     }
  22. });

正如上面两个例子这样,创建出 Observable 和 Subscriber ,再用 subscribe() 将它们串起来,一次 RxJava 的基本使用就完成了。非常简单。

注意:在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的。也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。而要实现异步,则需要用到 RxJava 的另一个概念: Scheduler 。

线程控制 —— Scheduler (一)
前言:

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

1) Scheduler 的 API (一)
在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:
Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。 * subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 * observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

代码来理解上面的文字叙述:

[java] view plain copy
  1. Observable.just(1, 2, 3, 4)  
  2.     .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程  
  3.     .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程  
  4.     .subscribe(new Action1<Integer>() {  
  5.         @Override  
  6.         public void call(Integer number) {  
  7.             Log.d(tag, “number:” + number);  
  8.         }
  9.     });

上面这段代码中,由于 subscribeOn(Schedulers.io()) 的指定,被创建的事件的内容 1、2、3、4 将会在 IO 线程发出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 数字的打印将发生在主线程 。事实上,这种在 subscribe() 之前写上两句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。

而前面提到的由图片 id 取得图片并显示的例子,如果也加上这两句:

[java] view plain copy
  1. int drawableRes = …;  
  2. ImageView imageView = …;
  3. Observable.create(new OnSubscribe<Drawable>() {  
  4.     @Override  
  5.     public void call(Subscriber<? super Drawable> subscriber) {  
  6.         Drawable drawable = getTheme().getDrawable(drawableRes));
  7.         subscriber.onNext(drawable);
  8.         subscriber.onCompleted();
  9.     }
  10. })
  11. .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程  
  12. .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程  
  13. .subscribe(new Observer<Drawable>() {  
  14.     @Override  
  15.     public void onNext(Drawable drawable) {  
  16.         imageView.setImageDrawable(drawable);
  17.     }
  18.     @Override  
  19.     public void onCompleted() {  
  20.     }
  21.     @Override  
  22.     public void onError(Throwable e) {  
  23.         Toast.makeText(activity, “Error!”, Toast.LENGTH_SHORT).show();  
  24.     }
  25. });

那么,加载图片将会发生在 IO 线程,而设置图片则被设定在了主线程。这就意味着,即使加载图片耗费了几十甚至几百毫秒的时间,也不会造成丝毫界面的卡顿。

2) Scheduler 的原理 (一)
RxJava 的 Scheduler API 很方便,也很神奇(加了一句话就把线程切换了,怎么做到的?而且 subscribe() 不是*外层直接调用的方法吗,它竟然也能被指定线程?)。然而 Scheduler 的原理需要放在后面讲,因为它的原理是以下一节《变换》的原理作为基础的。
好吧这一节其实我屁也没说,只是为了让你安心,让你知道我不是忘了讲原理,而是把它放在了更合适的地方。

变换
RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说『RxJava 真是太好用了』的*大原因。所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。概念说着总是模糊难懂的,来看 API。
1) API
首先看一个 map() 的例子:

[java] view plain copy
  1. Observable.just(“images/logo.png”) // 输入类型 String  
  2.     .map(new Func1<String, Bitmap>() {  
  3.         @Override  
  4.         public Bitmap call(String filePath) { // 参数类型 String  
  5.             return getBitmapFromPath(filePath); // 返回类型 Bitmap  
  6.         }
  7.     })
  8.     .subscribe(new Action1<Bitmap>() {  
  9.         @Override  
  10.         public void call(Bitmap bitmap) { // 参数类型 Bitmap  
  11.             showBitmap(bitmap);
  12.         }
  13.     });

这里出现了一个叫做 Func1 的类。它和 Action1 非常相似,也是 RxJava 的一个接口,用于包装含有一个参数的方法。 Func1 和 Action 的区别在于, Func1 包装的是有返回值的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。
可以看到,map() 方法将参数中的 String 对象转换成一个 Bitmap 对象后返回,而在经过 map() 方法后,事件的参数类型也由 String 转为了 Bitmap。这种直接变换对象并返回的,是*常见的也*容易理解的变换。不过 RxJava 的变换远不止这样,它不仅可以针对事件对象,还可以针对整个事件队列,这使得 RxJava 变得非常灵活。我列举几个常用的变换:
map(): 事件对象的直接变换,具体功能上面已经介绍过。它是 RxJava *常用的变换。 map() 的示意图:
flatMap(): 这是一个很有用但非常难理解的变换,因此我决定花多些篇幅来介绍它。 首先假设这么一种需求:假设有一个数据结构『学生』,现在需要打印出一组学生的名字。实现方式很简单:

[java] view plain copy
  1. Student[] students = …;
  2. Subscriber<String> subscriber = new Subscriber<String>() {  
  3.     @Override  
  4.     public void onNext(String name) {  
  5.         Log.d(tag, name);
  6.     }
  7.     …
  8. };
  9. Observable.from(students)
  10.     .map(new Func1<Student, String>() {  
  11.         @Override  
  12.         public String call(Student student) {  
  13.             return student.getName();  
  14.         }
  15.     })
  16.     .subscribe(subscriber);

很简单。那么再假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程。)首先可以这样实现:

[java] view plain copy
  1. Student[] students = …;
  2. Subscriber<Student> subscriber = new Subscriber<Student>() {  
  3.     @Override  
  4.     public void onNext(Student student) {  
  5.         List<Course> courses = student.getCourses();
  6.         for (int i = 0; i < courses.size(); i++) {  
  7.             Course course = courses.get(i);
  8.             Log.d(tag, course.getName());
  9.         }
  10.     }
  11.     …
  12. };
  13. Observable.from(students)
  14.     .subscribe(subscriber);

依然很简单。那么如果我不想在 Subscriber 中使用 for 循环,而是希望 Subscriber 中直接传入单个的 Course 对象呢(这对于代码复用很重要)?用 map() 显然是不行的,因为 map() 是一对一的转化,而我现在的要求是一对多的转化。那怎么才能把一个 Student 转化成多个 Course 呢?
这个时候,就需要用 flatMap() 了:

[java] view plain copy
  1. Student[] students = …;
  2. Subscriber<Course> subscriber = new Subscriber<Course>() {  
  3.     @Override  
  4.     public void onNext(Course course) {  
  5.         Log.d(tag, course.getName());
  6.     }
  7.     …
  8. };
  9. Observable.from(students)
  10.     .flatMap(new Func1<Student, Observable<Course>>() {  
  11.         @Override  
  12.         public Observable<Course> call(Student student) {  
  13.             return Observable.from(student.getCourses());  
  14.         }
  15.     })
  16.     .subscribe(subscriber);

从上面的代码可以看出, flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。 flatMap() 的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。

扩展:由于可以在嵌套的 Observable 中添加异步代码, flatMap() 也常用于嵌套的异步操作,例如嵌套的网络请求。示例代码(Retrofit + RxJava):

[java] view plain copy
  1. networkClient.token() // 返回 Observable<String>,在订阅时请求 token,并在响应后发送 token  
  2.     .flatMap(new Func1<String, Observable<Messages>>() {  
  3.         @Override  
  4.         public Observable<Messages> call(String token) {  
  5.             // 返回 Observable<Messages>,在订阅时请求消息列表,并在响应后发送请求到的消息列表  
  6.             return networkClient.messages();  
  7.         }
  8.     })
  9.     .subscribe(new Action1<Messages>() {  
  10.         @Override  
  11.         public void call(Messages messages) {  
  12.             // 处理显示消息列表  
  13.             showMessages(messages);
  14.         }
  15.     });

传统的嵌套请求需要使用嵌套的 Callback 来实现。而通过 flatMap() ,可以把嵌套的请求写在一条链中,从而保持程序逻辑的清晰。
throttleFirst(): 在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤,例如按钮的点击监听器: RxView.clickEvents(button) // RxBinding 代码,后面的文章有解释 .throttleFirst(500, TimeUnit.MILLISECONDS) // 设置防抖间隔为 500ms .subscribe(subscriber); 妈妈再也不怕我的用户手抖点开两个重复的界面啦。
此外, RxJava 还提供很多便捷的方法来实现事件序列的变换,这里就不一一举例了。

2) 变换的原理:lift()
这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下 lift() 的内部实现(仅核心代码):
// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。

[java] view plain copy
  1. public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {  
  2.     return Observable.create(new OnSubscribe<R>() {  
  3.         @Override  
  4.         public void call(Subscriber subscriber) {  
  5.             Subscriber newSubscriber = operator.call(subscriber);
  6.             newSubscriber.onStart();
  7.             onSubscribe.call(newSubscriber);
  8.         }
  9.     });
  10. }

这段代码很有意思:它生成了一个新的 Observable 并返回,而且创建新 Observable 所用的参数 OnSubscribe 的回调方法 call() 中的实现竟然看起来和前面讲过的 Observable.subscribe() 一样!然而它们并不一样哟~不一样的地方关键就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的对象不同(高能预警:接下来的几句话可能会导致身体的严重不适)——
subscribe() 中这句话的 onSubscribe 指的是 Observable 中的 onSubscribe 对象,这个没有问题,但是 lift() 之后的情况就复杂了点。
当含有 lift() 时:
1.lift() 创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;
2.而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe;
3.当用户调用经过 lift() 后的 Observable 的 subscribe() 的时候,使用的是 lift() 所返回的新的 Observable ,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe;
4.而这个新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的 Subscriber(Operator 就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的『变换』代码以实现变换),然后利用这个新 Subscriber 向原始 Observable 进行订阅。
这样就实现了 lift() 过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。
精简掉细节的话,也可以这么说:在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。
举一个具体的 Operator 的实现。下面这是一个将事件中的 Integer 对象转换成 String 的例子,仅供参考:

[java] view plain copy
  1. observable.lift(new Observable.Operator<String, Integer>() {  
  2.     @Override  
  3.     public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {  
  4.         // 将事件序列中的 Integer 对象转换为 String 对象  
  5.         return new Subscriber<Integer>() {  
  6.             @Override  
  7.             public void onNext(Integer integer) {  
  8.                 subscriber.onNext(“” + integer);  
  9.             }
  10.             @Override  
  11.             public void onCompleted() {  
  12.                 subscriber.onCompleted();
  13.             }
  14.             @Override  
  15.             public void onError(Throwable e) {  
  16.                 subscriber.onError(e);
  17.             }
  18.         };
  19.     }
  20. });

3) compose: 对 Observable 整体的变换
除了 lift() 之外, Observable 还有一个变换方法叫做 compose(Transformer)。它和 lift() 的区别在于, lift() 是针对事件项和事件序列的,而 compose() 是针对 Observable 自身进行变换。举个例子,假设在程序中有多个 Observable ,并且他们都需要应用一组相同的 lift() 变换。你可以这么写:

[java] view plain copy
  1. observable1
  2.     .lift1()
  3.     .lift2()
  4.     .lift3()
  5.     .lift4()
  6.     .subscribe(subscriber1);
  7. observable2
  8.     .lift1()
  9.     .lift2()
  10.     .lift3()
  11.     .lift4()
  12.     .subscribe(subscriber2);
  13. observable3
  14.     .lift1()
  15.     .lift2()
  16.     .lift3()
  17.     .lift4()
  18.     .subscribe(subscriber3);
  19. observable4
  20.     .lift1()
  21.     .lift2()
  22.     .lift3()
  23.     .lift4()
  24.     .subscribe(subscriber1);

你觉得这样太不软件工程了,于是你改成了这样:

[java] view plain copy
  1. private Observable liftAll(Observable observable) {  
  2.     return observable  
  3.         .lift1()
  4.         .lift2()
  5.         .lift3()
  6.         .lift4();
  7. }
  8. liftAll(observable1).subscribe(subscriber1);
  9. liftAll(observable2).subscribe(subscriber2);
  10. liftAll(observable3).subscribe(subscriber3);
  11. liftAll(observable4).subscribe(subscriber4);

可读性、可维护性都提高了。可是 Observable 被一个方法包起来,这种方式对于 Observale 的灵活性似乎还是增添了那么点限制。怎么办?这个时候,就应该用 compose() 来解决了:

[java] view plain copy
  1. public class LiftAllTransformer implements Observable.Transformer<Integer, String> {  
  2.     @Override  
  3.     public Observable<String> call(Observable<Integer> observable) {  
  4.         return observable  
  5.             .lift1()
  6.             .lift2()
  7.             .lift3()
  8.             .lift4();
  9.     }
  10. }
  11. Transformer liftAll = new LiftAllTransformer();  
  12. observable1.compose(liftAll).subscribe(subscriber1);
  13. observable2.compose(liftAll).subscribe(subscriber2);
  14. observable3.compose(liftAll).subscribe(subscriber3);
  15. observable4.compose(liftAll).subscribe(subscriber4);

像上面这样,使用 compose() 方法,Observable 可以利用传入的 Transformer 对象的 call 方法直接对自身进行处理,也就不必被包在方法的里面了。
compose() 的原理比较简单,不附图喽。

线程控制:Scheduler (二)
除了灵活的变换,RxJava 另一个牛逼的地方,就是线程的自由控制。
1) Scheduler 的 API (二)
前面讲到了,可以利用 subscribeOn() 结合 observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。可是在了解了 map() flatMap() 等变换方法后,有些好事的(其实就是当初刚接触 RxJava 时的我)就问了:能不能多切换几次线程?
答案是:能。因为 observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。上代码:

[java] view plain copy
  1. Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定  
  2.     .subscribeOn(Schedulers.io())
  3.     .observeOn(Schedulers.newThread())
  4.     .map(mapOperator) // 新线程,由 observeOn() 指定  
  5.     .observeOn(Schedulers.io())
  6.     .map(mapOperator2) // IO 线程,由 observeOn() 指定  
  7.     .observeOn(AndroidSchedulers.mainThread)
  8.     .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定  

如上,通过 observeOn() 的多次调用,程序实现了线程的多次切换。
不过,不同于 observeOn() , subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。
又有好事的(其实还是当初的我)问了:如果我非要调用多次 subscribeOn() 呢?会有什么效果?
这个问题先放着,我们还是从 RxJava 线程控制的原理说起吧。
2) Scheduler 的原理(二)
其实, subscribeOn() 和 observeOn() 的内部实现,也是用的 lift()。具体看图(不同颜色的箭头表示不同的线程):
从图中可以看出,subscribeOn() 和 observeOn() 都做了线程切换的工作(图中的 “schedule…” 部位)。不同的是, subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程。
3) 延伸:doOnSubscribe()
然而,虽然超过一个的 subscribeOn() 对事件处理的流程没有影响,但在流程之前却是可以利用的。
在前面讲 Subscriber 的时候,提到过 Subscriber 的 onStart() 可以用作流程开始前的初始化。然而 onStart() 由于在 subscribe() 发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe() 被调用时的线程。这就导致如果 onStart() 中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测 subscribe() 将会在什么线程执行。
而与 Subscriber.onStart() 相对应的,有一个方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它*近的 subscribeOn() 所指定的线程。
示例代码:

[java] view plain copy
  1. Observable.create(onSubscribe)
  2.     .subscribeOn(Schedulers.io())
  3.     .doOnSubscribe(new Action0() {  
  4.         @Override  
  5.         public void call() {  
  6.             progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行  
  7.         }
  8.     })
  9.     .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程  
  10.     .observeOn(AndroidSchedulers.mainThread())
  11.     .subscribe(subscriber);

如上,在 doOnSubscribe()的后面跟一个 subscribeOn() ,就能指定准备工作的线程了

213. 打家劫舍 II(JS实现)

213. 打家劫舍 II(JS实现)
1 题目
你是一个专业的小偷,计划偷窃沿街的房屋,每间房内都藏有一定的现金。这个地方所有的房屋都围成一圈,这意味着*个房屋和*后一个房屋是紧挨着的。同时,相邻的房屋装有相互连通的防盗系统,如果两间相邻的房屋在同一晚上被小偷闯入,系统会自动报警。
给定一个代表每个房屋存放金额的非负整数数组,计算你在不触动警报装置的情况下,能够偷窃到的*高金额。
示例 1:
输入: [2,3,2]
输出: 3
解释: 你不能先偷窃 1 号房屋(金额 = 2),然后偷窃 3 号房屋(金额 = 2), 因为他们是相邻的。
示例 2:
输入: [1,2,3,1]
输出: 4
解释: 你可以先偷窃 1 号房屋(金额 = 1),然后偷窃 3 号房屋(金额 = 3)。
偷窃到的*高金额 = 1 + 3 = 4 。
链接:https://leetcode-cn.com/problems/house-robber-ii
2 思路
这道题考察动态规划,与打家劫舍一不同的是,这次的房屋围成了一个环状,如果偷了*家,就一定不能偷*后一家,因此我们考虑将环状拆分为两段,分别为1…n-1和2…n,分别用动态规划计算其能偷到的*大金额,然后两者取*大即可
3代码
/**
 * @param {number[]} nums
 * @return {number}
 */
var rob = function(nums) {
    if (nums.length === 1 || nums.length === 2) return Math.max(…nums);
    if (nums.length === 0) return 0;
    const d = [nums[0]];
    const d1 = [0, nums[1]];
    for (let i = 1; i<nums.length-1; i++) {
        if (i === 1) {
            d[i] = Math.max(nums[0], nums[1]);
        } else {
            d[i] = Math.max(d[i-1], d[i-2] + nums[i]);
        }
    }
    for (let i = 2; i<nums.length; i++) {
        if (i === 2) {
            d1[i] = Math.max(nums[1], nums[2]);
        } else {
            d1[i] = Math.max(d1[i-1], d1[i-2] + nums[i]);
        }
    }
    return Math.max(d[d.length – 1], d1[d1.length – 1]);
};

215. 数组中的第K个*大元素(JS实现)

215. 数组中的第K个*大元素(JS实现)
1 题目
在未排序的数组中找到第 k 个*大的元素。请注意,你需要找的是数组排序后的第 k 个*大的元素,而不是第 k 个不同的元素。
示例 1:
输入: [3,2,1,5,6,4] 和 k = 2
输出: 5
示例 2:
输入: [3,2,3,1,2,4,5,5,6] 和 k = 4
输出: 4
说明:
你可以假设 k 总是有效的,且 1 ≤ k ≤ 数组的长度。
链接:https://leetcode-cn.com/problems/kth-largest-element-in-an-array
2 思路
这道题考察排序,根据题意,选择堆排序比较合适
3代码
/**
 * @param {number[]} nums
 * @param {number} k
 * @return {number}
 */
var findKthLargest = function(nums, k) {
 if (nums.length <= 1) return nums[k-1] || null;
  nums.unshift(null);     //由于孩子节点的索引为父节点索引的2倍,因此为了计算方便数组*项留空
  let len = Math.floor((nums.length-1) / 2);
  for (let i=len; i>=1; i–) {
    adjust(i, nums.length-1);
  }
  for (let i=nums.length-1; i>=nums.length – k; i–) {   //找到第k大的数后,结束循环
    swap(1, i);
    adjust(1, i-1);
  }
  return nums[nums.length – k];
  function adjust(s, maxIndex) {    //调整不满足的子树结构使其符合大顶堆,s为需要调整的数
    let temp = nums[s];
    for (let i=2*s; i<=maxIndex; i*=2) {    //依次比较子节点,找到放置s的位置
      if (i+1 <= maxIndex && nums[i] < nums[i+1]) i++;
      if (temp > nums[i]) break;
      nums[s] = nums[i];
      s = i;
    }
    nums[s] = temp;
  }
  function swap(i,j) {
    let temp = nums[i];
    nums[i] = nums[j];
    nums[j] = temp;
  }
};

216. 组合总和 III(JS实现)

216. 组合总和 III(JS实现)
\
1 题目
找出所有相加之和为 n 的 k 个数的组合。组合中只允许含有 1 – 9 的正整数,并且每种组合中不存在重复的数字。
说明:
所有数字都是正整数。
解集不能包含重复的组合。
示例 1:
输入: k = 3, n = 7
输出: [[1,2,4]]
示例 2:
输入: k = 3, n = 9
输出: [[1,2,6], [1,3,5], [2,3,4]]
链接:https://leetcode-cn.com/problems/combination-sum-iii
2 思路
这道题还是考察回溯+剪枝,整体的思路就是构造一棵树,*后能到达叶子节点的路径就是有效的结果
3代码
/**
 * @param {number} k
 * @param {number} n
 * @return {number[][]}
 */
var combinationSum3 = function(k, n) {
  const res = [];
  if (n < 1 || k < 1 || k > 9) return res;
  let nums = [1,2,3,4,5,6,7,8,9];
  function d(arr, index, num, begin) {
    for (let i=begin; i<9; i++) {
      let temp = num – nums[i];
      if (temp < 0) break;
      if (temp > 0 && index > 1) {
        let tempArr = arr.slice();
        tempArr.push(nums[i]);
        d(tempArr, index-1, temp, i+1);
      } else if (temp === 0 && index === 1) {
        let tempArr = arr.slice();
        tempArr.push(nums[i]);
        res.push(tempArr);
      }
    }
  }
  d([], k, n, 0);
  return res;
};
\

220. 存在重复元素 III(JS实现)

220. 存在重复元素 III(JS实现)
1 题目
在整数数组 nums 中,是否存在两个下标 i 和 j,使得 nums [i] 和 nums [j] 的差的*对值小于等于 t ,且满足 i 和 j 的差的*对值也小于等于 ķ 。
如果存在则返回 true,不存在返回 false。
示例 1:
输入: nums = [1,2,3,1], k = 3, t = 0
输出: true
示例 2:
输入: nums = [1,0,1,1], k = 1, t = 2
输出: true
示例 3:
输入: nums = [1,5,9,1,5,9], k = 2, t = 3
输出: false
链接:https://leetcode-cn.com/problems/contains-duplicate-iii
2 思路
这道题我是用土办法,用滑动窗口来做的,时间复杂度有点高,题解中有用平衡二叉树和桶排序的方法,比较巧妙,在此记录下桶排序的方法 https://leetcode-cn.com/problems/contains-duplicate-iii/solution/cun-zai-zhong-fu-yuan-su-iii-by-leetcode/
3代码
/**
 * @param {number[]} nums
 * @param {number} k
 * @param {number} t
 * @return {boolean}
 */
var containsNearbyAlmostDuplicate = function(nums, k, t) {
    if (nums.length === 0 || t < 0 || k < 0) return false;
    let arr = [0];
    for (let i = 1; i<nums.length; i++) {
        let index = 0;
        for (let j of arr) {
            if (i – j > k) {
                index++;
                continue;
            }
            if (Math.abs(nums[i] – nums[j]) > t) continue;
            return true;
        }
        arr = arr.slice(index);
        arr.push(i);
    }
    return false;
};

221. *大正方形(JS实现)

221. *大正方形(JS实现)
1 题目
在一个由 0 和 1 组成的二维矩阵内,找到只包含 1 的*大正方形,并返回其面积。
示例:
输入:
1 0 1 0 0
1 0 1 1 1
1 1 1 1 1
1 0 0 1 0
输出: 4
链接:https://leetcode-cn.com/problems/maximal-square
2 思路
这道题可以用动态规范的方法来做,d[i][j]为右下角为matrix[i][j]的符合题意的正方形*大边长,那么状态转移方程,d[i+1][j+1] = Math.min(d[i][j+1], d[i][j], d[i+1][j]) + 1,可以自己画图看一下是不是这个规律
3代码
/**
 * @param {character[][]} matrix
 * @return {number}
 */
var maximalSquare = function(matrix) {
    let rows = matrix.length;
    if (rows === 0) return 0;
    let cols = matrix[0].length;
    const d = [];
    let maxEdge = 0;
    for (let i=0; i<rows; i++) {
        if (!d[i]) d[i] = [];
        for (let j=0; j<cols; j++) {
            if (i > 0 && j > 0 && matrix[i][j] === ‘1’ && d[i-1][j-1] > 0) {
                let m = i;
                for (; m>=i-d[i-1][j-1]; m–) {    //遍历当前行看是否满足正方形条件
                    if (matrix[m][j] === ‘0’) {
                        break;
                    }
                }
                let n = j;
                for (; n>=j-d[i-1][j-1]; n–) {   //遍历当前列看是否满足正方形条件
                    if (matrix[i][n] === ‘0’) {
                        break;
                    }
                }
                let currentEdge = Math.min(i-m, j-n);
                d[i][j] = currentEdge;
                maxEdge = Math.max(d[i][j], maxEdge)
            } else {
                d[i][j] = parseInt(matrix[i][j]);
                if (maxEdge === 0) maxEdge = Math.max(d[i][j], maxEdge);
            }
        }
    }
    return maxEdge * maxEdge;
};

云服务器哪家好?阿里云腾讯云华为云要怎么选?

不管你是创业型公司还是小有成绩的中小型企业又或是规模已算是上市企业,不管你是想搭建一个属于自己的网站还是要完成一些线上线下的项目,现如今服务器已然成为各行各业所需标配。以前的公司想做搭建自己的网站(企业网站,论坛社区等),想实现一些线上的项目(如搭建一个电商平台、游戏app、服务企业的第三方软件等)就得自己建立一些机房储备一台大型的“电脑主机”用来存储用户,收集数据,实现计算,处理数据,而这种方式的弊端就是硬件设备成本大,维护费用高,为此云服务器就顺势而生,华为,阿里,腾讯亚马逊等相继推出云服务器供应给企业使用,那么企业主们要如何选择云服务呢?
1、华为云服务器
华为云服务器*大的优势就是服务性能和安全,有着“上不碰应用,下不碰数据”的天然优势,与阿里云和腾讯云以数据变现相比,华为云主要以技术和服务为主,在安全防护和信息防护上确实做得*好的,这也是很多政府,银行,高校和军工企业选择华为云的原因,另外华为云是以公有云为主的,所以阿里云和腾讯云以及其他云服务器的厂商都有用到华为云的产品来跟自己的私有云进行混合搭配,构建混合云来供应给客户使用,所以在一定程度上华为云更属于基础设施服务,能为客户提供更纯净的平台环境。劣势的话,相比腾讯云和阿里云,价格上可能会稍微贵一些,不过这也是2017华为云更推出的价格,今年从三方推出的活动来看的话价格上不会有特别大的差别。不过想购买华为云服务器的企业主不一定要直接在华为云官网注册就是,可以找他们的代理商注册,在代理商那边注册好像会有一些优惠。
2、阿里云服务器
说到阿里云服务器,我想关于它的故事肯定不会少,这里我就不详细的说了,阿里云是2009年创立的,2011年才正式对外提供云计算的服务,属于国内一个吃蛋糕的人,所以市场份额目前*,2019年营收好像是52亿美元,占了市场份额的60%左右。而紧跟其后的就是华为云和腾讯云了,要么华为云第二,要么就是腾讯云第二,不过华为云是2017年才推出的,仅仅4年的时间就达到了腾讯云10年的业绩,不得不佩服华为精神啊!而阿里云*大的优势在于数据处理,阿里云国内起步比较早,云产品也比较丰富,不过也正是因为这样,一般选择了阿里云服务器后期需要迁移到别也是比较麻烦的,至于价格,确实经常推出优惠,对于预算不是特别多的,是非常推荐的。
3、腾讯云服务器
腾讯云一开始时是没有独立开来的,它*早孵化于腾讯QQ的事业群,2013年开始推出,2019年腾讯云营收大概在170亿元,腾讯云*大的优势在于网络速度,至少在流畅性上,有人测试过1核2g的服务器,腾讯云确实在流畅性上是高于阿里云的,在windows操作系统优化上略好于阿里云。至于价格方面上,腾讯云也是稍微有优势的,虽然两者都经常推出活动,但阿里云更多的是服务于中小用户为主,配置会相对低一些。
4、该如何选择
至于这三方该选择什么样的云服务器,关键看自己的需求和预算,如果预算足够多,项目所需各方面支持比较大,那就选择华为云,毕竟华为云用3年的时间就达到了腾讯云的成绩,对阿里云造成一定的威胁,除了团队的原因,我想更多的还是产品被大多数企业认可,也有不少阿里云和腾讯云的用户纷纷迁移到华为云了。如果是预算不够,只是做个低配的网站,涉及的用户量不是特别多,那就选择阿里云和腾讯云,毕竟节省成本才是王道。

国内服务器怎么选择?腾讯云、阿里云、天翼云?

随着中国企业云服务器使用率的不断提升,虽然与国外一些国家相比还有很大差距。但得益于政策红利和中国企业的数字化转型,市场潜力空间仍然很大,而作为互联网行业中的一员,我们也应当对云服务器的基础知识有一定的了解,利用具有较多优势的云服务平台,研发应用层人工智能产品提供决策辅助。

一、什么是云服务器
云服务器(Elastic Compute Service, ECS)是一种简单高效、安全可靠、处理能力可弹性伸缩的计算服务。其管理方式比物理服务器更简单高效。用户无需提前购买硬件,即可迅速创建或释放任意多台云服务器。

从功能上来看,云服务器主要承载三大方面功能:
云服务器要实现存储功能(分布式)
实现计算功能(分布式)
实现资源整合功能

云计算以分布式存储和分布式计算为核心,通过采用虚拟化的方式来实现资源的动态管理,通过资源整合的方式来实现自身功能的扩充,这一点主要是为了提供PaaS相关服务。
云服务器帮助您快速构建更稳定、安全的应用,降低开发运维的难度和整体IT成本,使您能够更专注于核心业务的创新。

二、云服务器可以解决哪些问题
降低成本
缩短业务部署周期
数据计算及处理功能强大

随着IT行业在全球范围内的快速发展,IT平台的规模和复杂程度出现了大幅度的提升,但是,高昂的硬件和运维管理成本、漫长的业务部署周期以及数据处理能力薄弱为企业IT部门制造了重重障碍。云服务器技术颠覆性的改变了传统IT行业的消费模式和服务模式,消费者实现了从以前的“购买软硬件产品”向“购买云服务”转变,相当于不必自己购买,通过租赁就实现,大大提高了IT效率和敏捷性。

三、云服务提供商介绍
国内的有阿里云、腾讯云、华为云、天翼云以及一些初创型小公司等等。下面介绍下国内的云服务商各自的优缺点:
1.阿里云
阿里云创立于2009年,是亚洲*大的云计算平台和云计算服务提供商,和亚马逊AWS、微软Azure共同构成了全球云计算市场*阵营。在国内,阿里云的市场占有率高居*,甚至超过了第二名到第九名之和,是国内云计算市场公认的领头羊和行业巨头。

阿里云的服务群体中,活跃着淘宝、支付宝、12306、中石化、中国银行、中科院、中国联通、微博、知乎、锤子科技等一大批明星产品和公司。它的目标客户*为全面,基本上包含了所有行业,其中个人开发者、互联网用户及中小企业用户占据了将近90%。

阿里云优点
规模大
亚太:据Gartner数据,阿里云以19.6%的市场占有率成为了亚太市场*。
中国:根据IDC报告,阿里云在中国云计算市场有*对领导力,市场份额位居中国*。
全球:据Gartner数据,在全球云计算市场中,新兴市场只有阿里云脱颖而出,位列全球第三。

稳定
由于阿里云拥有庞大的飞天平台,所以服务也非常稳定高效,在天猫双11全球狂欢节、12306春运购票等*富挑战的应用场景中,阿里云保持着良好的运行纪录。

服务品种多样
由于阿里云创建于2009年,时间非常早,至今已经发展了11年之久,所以服务品种也非常丰富多样。

生态完善
阿里云被广泛的生态伙伴所集成,全球合作伙伴数量超过10000家,服务客户超过10万家。 在技术领域,阿里云是国际开源社区贡献*大的中国公司。

2.腾讯云
腾讯云于2013年9月正式对外全面开放,腾讯云经过QQ、QQ 空间、微信、腾讯游戏等业务的技术锤炼,从基础架构到精细化运营,从平台实力到生态能力建设,腾讯云得到了全面的发展,使之能够为企业和创业者提供集云计算、云数据、云运营于一体的云端服务体验。

腾讯云目标客户聚焦于“社交、游戏”两大领域。腾讯云在国内市场占据18%的市场份额,紧随阿里云之后。
腾讯云业务主要包括:云计算基础服务、存储与网络、安全、数据库服务、人工智能、行业解决方案等。

腾讯云优点
“社交”与“游戏”领域实力强
由于腾讯在社交领域的强大,依托QQ、微信开放平台吸引了大量个人用户和中小企业开发者,选择腾讯云会有更好的兼容性。
同理,腾讯是国内*大的游戏厂商,如果公司的游戏产品依托于腾讯的平台,选择腾讯云肯定不会有错。

3.华为云
华为云成立于2011年,专注于云计算中“公有云”领域的技术研究与生态拓展,致力于为用户提供一站式的云计算基础设施服务,是目前国内大型的公有云服务与解决方案提供商之一。华为云在国内市场占据8%左右的市场份额。

华为云,立足于互联网领域,依托华为公司雄厚的资本和强大的云计算研发实力,目标客户聚焦于互联网增值服务运营商、大中小型企业、政府机构、科研院所等广大企业、事业单位的用户,提供包括“云主机、云托管、云存储”等基础云服务、“DDoS高防ADD、数据库安全、数据加密、Web防火墙”等安全服务,以及“域名注册、云速建站、混合云灾备、智慧园区”等解决方案。

优点
全栈AI
华为云提供全栈全场景AI解决方案,是业界仅有的两家全栈AI云服务商之一。并且,华为云一站式开发平台ModelArts已在十大行业200多个AI项目中进行实践,并以*落地的方式使能产业向智能化平滑演进。

适合政府和大中企业
凭借八年深耕企业市场和华为自身数字化转型的实践经验,财富500强企业中已有221家选择华为。在不断实践的过程中,华为云助力合作伙伴走向智能化的案例颇丰,例如华为云为深圳坂田提供的智慧交通系统,令路口通行效率提升了17%;与德邦快递深入合作,实现了科技与快递业务场景的融合,快递服务体验全面升级。

适合跨国企业
华为云已在全球23个地理区域运营了40个可用区,帮助用户一点接入,全球通达。其中,与中国三大运营商无缝衔接的合作,便可证明华为云是国内首个可以为企业提供安全合规的跨境联网服务的公有云。

4.天翼云
天翼云是中国电信旗下云计算品牌,于2016被中国电信注册,用于中国电信股份有限公司云计算分公司商标使用,是中国电信旗下的的云计算服务提供商。目标用户主要面向企业市场,依靠电信在IDC、数据中心服务等领域的客户为基础,进行追加销售和捆绑销售。但由于中国电信本身的数据体量也非常大,它自己本身使用也是非常大的需求,中国电信在政府、国企等领域还是有很多的客户资源可用,对于其云计算的销售有不小的促进作用。例如大家*熟悉的苹果的icloud国内云存储服务即云上贵州,就是与天翼云牵手合作的。

天翼云优点
安全
提供运营商级保障,并且使用的是符合苹果严苛标准的中国电信资源池,账号信息加密保存,即使攻破数据库,他人也无法得知你的密码。

4.初创公司
初创公司其实在某个层面上也可以理解为小公司,也由于公司体量的小,目标客户也主要是个人用户和小微企业。市面上还是存在很多小型云服务商的,例如小鸟云,天达云等等,基本上知名度很小的,大部分都是初创公司,这里就不详细介绍了,因为他们的优缺点其实都是相似的。

初创公司优点
便宜
对于个人用户来说,这一点是很重要的,为了争夺市场,初创公司的定价都会非常便宜。

缺点
安全性较差
服务品种较少,品质较差

总结
综上所述,大型企业以及政府部门机构方面,华为云、阿里云比较占优势;而浪潮云专注政务云,在政务云方面优势很大;天翼云这类电信运营商创建的更注重于自用;腾讯云在游戏、社交领域得益于老爹腾讯的地位,独领风骚;Ucloud虽然名声不显,但在低调的程序员群体中颇受欢迎;而对更多的个人用户来说,初创公司虽然品质不高,但价格非常划算;但总的来说,阿里云依然占据着国内毋庸置疑的老大位置,并且将他的一众竞争对手距离拉的非常远。