RxJava 和 RxAndroid (生命周期控制和内存优化)

RxJava使我们很方便的使用链式编程,代码看起来既简洁又优雅。但是RxJava使用起来也是有副作用的,使用越来越多的订阅,内存开销也会变得很大,稍不留神就会出现内存溢出的情况,这篇文章就是介绍Rxjava使用过程中应该注意的事项。

1、取消订阅 subscription.unsubscribe() ;

package lib.com.myapplication;
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action1;

public class MainActivity extends AppCompatActivity {

    Subscription subscription ;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        subscription =  Observable.just( "123").subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println( "tt--" + s );
            }
        }) ;
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        //取消订阅
        if ( subscription != null ){
            subscription.unsubscribe();
        }
    }
}

2、线程调度

  • Scheduler调度器,相当于线程控制器
    • Schedulers.immediate() : 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
    • Schedulers.newThread() :总是启用新线程,并在新线程执行操作.
    • Schedulers.io():I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
    • Schedulers.computation() : 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
    • 还有RxAndroid里面专门提供了AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
  • 常见的场景:为了不阻塞UI,在子线程加载数据,在主线线程显示数据
          Observable.just( "1" , "2" , "3" )
                .subscribeOn(Schedulers.io())  //指定 subscribe() 发生在 IO 线程
                .observeOn( AndroidSchedulers.mainThread() )  //指定 Subscriber 的回调发生在主线程
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        textView.setText( s );
                    }
                }) ;
    

    上面这段代码,数据”1″、”2″、”3″将在io线程中发出,在android主线程中接收数据。这种【后台获取数据,前台显示数据】模式适用于大多数的程序策略。

  • Scheduler 自由多次切换线程。恩,这个更为牛逼
    Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
     .subscribeOn(Schedulers.io())
     .observeOn(Schedulers.newThread())
     .map(mapOperator) // 新线程,由 observeOn() 指定
     .observeOn(Schedulers.io())
     .map(mapOperator2) // IO 线程,由 observeOn() 指定
     .observeOn(AndroidSchedulers.mainThread) 
     .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定
    

    从上面的代码可以看出

    • observeOn() 可以调用多次来切换线程,observeOn 决定他下面的方法执行时所在的线程。
    • subscribeOn() 用来确定数据发射所在的线程,位置放在哪里都可以,但它是只能调用一次的。

  • 上面介绍了两种控制Rxjava生命周期的方式,*种:取消订阅 ;第二种:线程切换 。这两种方式都能有效的解决android内存的使用问题,但是在实际的项目中会出现很多订阅关系,那么取消订阅的代码也就越来越多。造成了项目很难维护。所以我们必须寻找其他可靠简单可行的方式,也就是下面要介绍的。

3、rxlifecycle 框架的使用

  • github地址: https://github.com/trello/RxLifecycle
  • 在android studio 里面添加引用
    compile 'com.trello:rxlifecycle-components:0.6.1'
  • 让你的activity继承RxActivity,RxAppCompatActivity,RxFragmentActivity
    让你的fragment继承RxFragment,RxDialogFragment;下面的代码就以RxAppCompatActivity举例
  • bindToLifecycle 方法
    在子类使用Observable中的compose操作符,调用,完成Observable发布的事件和当前的组件绑定,实现生命周期同步。从而实现当前组件生命周期结束时,自动取消对Observable订阅。

     public class MainActivity extends RxAppCompatActivity {
            TextView textView ;
            
            @Override
            protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
            textView = (TextView) findViewById(R.id.textView);
        
            //循环发送数字
            Observable.interval(0, 1, TimeUnit.SECONDS)
                .subscribeOn( Schedulers.io())
                .compose(this.<Long>bindToLifecycle())   //这个订阅关系跟Activity绑定,Observable 和activity生命周期同步
                .observeOn( AndroidSchedulers.mainThread())
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        System.out.println("lifecycle--" + aLong);
                        textView.setText( "" + aLong );
                    }
                });
           }
        }
    

    上面的代码是Observable循环的发送数字,并且在textview中显示出来
    1、没加 compose(this.<Long>bindToLifecycle()) 当Activiry 结束掉以后,Observable还是会不断的发送数字,订阅关系没有解除
    2、添加compose(this.<Long>bindToLifecycle()) 当Activity结束掉以后,Observable停止发送数据,订阅关系解除。

  • 从上面的例子可以看出bindToLifecycle() 方法可以使Observable发布的事件和当前的Activity绑定,实现生命周期同步。也就是Activity 的 onDestroy() 方法被调用后,Observable 的订阅关系才解除。那能不能指定在Activity其他的生命状态和订阅关系保持同步,答案是有的。就是 bindUntilEvent()方法。这个逼装的好累!
  • bindUntilEvent( ActivityEvent event)
    • ActivityEvent.CREATE: 在Activity的onCreate()方法执行后,解除绑定。
    • ActivityEvent.START:在Activity的onStart()方法执行后,解除绑定。
    • ActivityEvent.RESUME:在Activity的onResume()方法执行后,解除绑定。
    • ActivityEvent.PAUSE: 在Activity的onPause()方法执行后,解除绑定。
    • ActivityEvent.STOP:在Activity的onStop()方法执行后,解除绑定。
    • ActivityEvent.DESTROY:在Activity的onDestroy()方法执行后,解除绑定。
     //循环发送数字
         Observable.interval(0, 1, TimeUnit.SECONDS)
                 .subscribeOn( Schedulers.io())
                 .compose(this.<Long>bindUntilEvent(ActivityEvent.STOP ))   //当Activity执行Onstop()方法是解除订阅关系
                 .observeOn( AndroidSchedulers.mainThread())
                 .subscribe(new Action1<Long>() {
                     @Override
                     public void call(Long aLong) {
                         System.out.println("lifecycle-stop-" + aLong);
                         textView.setText( "" + aLong );
                     }
                 });
    

    经过测试发现,当Activity执行了onStop()方法后,订阅关系已经解除了。
    上面说的都是订阅事件与Activity的生命周期同步,那么在Fragment里面又该怎么处理的?

  • FragmentEvent 这个类是专门处理订阅事件与Fragment生命周期同步的大杀器
    public enum FragmentEvent {
    
    ATTACH,
    CREATE,
    CREATE_VIEW,
    START,
    RESUME,
    PAUSE,
    STOP,
    DESTROY_VIEW,
    DESTROY,
    DETACH
    }
    

    可以看出FragmentEvent 和 ActivityEvent 类似,都是枚举类,用法是一样的。这里就不举例了!

总结
1、这三篇文章的相关代码示例都在 http://git.oschina.net/zyj1609/RxAndroid_RxJava
2、通过上面的三种方法,我相信你在项目中使用Rxjava的时候,已经能够很好的控制了 Rxjava对内存的开销。如果你有其他的方法或者问题,可以留言给我。

RxJava操作符之Share, Publish, Refcount

看源码知道.share()操作符是.publish().refcount()调用链的包装。

先来看ConnectedObservable

“ConnectedObservable” – This is a kind of observable which doesn’t emit items even if subscribed to.
It only starts emitting items after its .connect() method is called.

因为这个原因,在ConnectedObservable的connect这个方法被调用之前,connected obesrvable也被认为是“冷”和不活跃。

再看publish方法

.publish()– This method allows us to change an ordinary observable into a “ConnectedObservable”.
Simply call this method on an ordinary observable and it becomes a connected one.

现在我们知道了share操作符的1/2,那么为什么需要运用Connected Observable这个操作符呢?文档上是这么写的:

In this way you can wait for all intended Subscribers to subscribe to the Observable before the Observable begins emitting items.

这就意味着publish可以调用多个subscriber。当你有超过一个订阅者的时候,处理每个订阅和正确的销毁他们变得棘手。 为了使这个更方便,Rx发明了这个魔法操作符refcount():

refcount() – This operator keeps track of how many subscribers are subscribed to the resulting Observable and
refrains from disconnecting from the source ConnectedObservable until all such Observables are unsubscribed.

refcount本质上在后台维护着一个引用计数器,当一个subscription需要取消订阅或者销毁的时候,发出一个正确的动作。

我们再次看一下debouncedBuffer的例子,看一下在哪,share是怎么用的。

Observable<Object> tapEventEmitter = _rxBus.toObserverable().share();
// which is really the same as:
Observable<Object> tapEventEmitter = _rxBus.toObserverable().publish().refcount();

我们现在有了一个”shareable”的名字叫”tapEventEmitter”的observable。 因为他是可以分享的,而且还不是“live”(share操作符中的publish调用使其变成一个ConnectedObservable), 我们可以用他构成我们的Observables,而且要确保我们有了一个原始的observable的引用 (这个例子中原始的observable是_rxBus.toObserverable()).

Observable<Object> tapEventEmitter = _rxBus.toObserverable().share();
Observable<Object> debouncedEventEmitter = tapEventEmitter.debounce(1, TimeUnit.SECONDS);
tapEventEmitter.buffer(debouncedEventEmitter)
//...

所有的这一切看起来都很好。然而这个实现会有一个可能的竞争条件。因为这有两个subscribers(debounce and buffer)而且会在不同的时间点发生,所以竞争条件就会发生。 记住RxBus是由hot/live Subject支持的不断的发射数据。通过使用share操作符,我们保证引用的是同一个资源。 而不是subscribers在不同的时间点订阅,他们会收到准确的相同的数据。

The race condition is when the two consumers subscribe. Often on a hot stream it doesn’t matter when subscribers come and go,and refCount is perfect for that.
The race condition refCount protects against is having only 1 active subscription upstream. However,if 2 Subscribers subscribe to a refcounted stream that emits 1, 2, 3, 4, 5, the first may get 1, 2, 3, 4, 5 and the second may get 2, 3, 4, 5.

To ensure all subscribers start at exactly the same time and get the exact same values, refCount can not be used.
Either ConnectableObservable with a manual, imperative invocation of connect needs to be done, or the variant of publish(function)which connects everything within the function before connecting the upstream.

在我们的用法中几乎立即执行所以没有什么关系。但是我们原始的意图是把debouncedBuffer方法作为一个单独的操作符。 如果相同的事件没有被发射出去,从概念上看起来是不正确的。

通过Bean后来的建议,我添加了一个更好的第三方的实现,用来处理这种竞争条件。

// don't start emitting items just yet by turning the observable to a connected one
ConnectableObservable<Object> tapEventEmitter = _rxBus.toObserverable().publish();

tapEventEmitter.publish((Func1) (stream) -> {

// inside `publish`, "stream" is truly multicasted

// applying the same technique for getting a debounced buffer sequence
    return stream.buffer(stream.debounce(1, TimeUnit.SECONDS));

}).subscribe((Action1) (taps) {
_showTapCount(taps.size());
});

// start listening to events now
tapEventEmitter.connect();

RxJava2 源码解析(二)

概述

承接上一篇RxJava2 源码解析(一),
本系列我们的目的:

知道源头(Observable)是如何将数据发送出去的。
知道终点(Observer)是如何接收到数据的。
何时将源头和终点关联起来的
知道线程调度是怎么实现的
知道操作符是怎么实现的

本篇计划讲解一下4,5.

RxJava*强大的莫过于它的线程调度 和 花式操作符。
map操作符

map是一个高频的操作符,我们首先拿他开刀。
例子如下,源头Observable发送的是String类型的数字,利用map转换成int型,*终在终点Observer接受到的也是int类型数据。:

final Observable<String> testCreateObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext(“1”);
e.onComplete()
}
});

Observable<Integer> map = testCreateObservable.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
});
map.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, “onSubscribe() called with: d = [” + d + “]”);
}

@Override
public void onNext(Integer value) {
Log.d(TAG, “onNext() called with: value = [” + value + “]”);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, “onError() called with: e = [” + e + “]”);
}

@Override
public void onComplete() {
Log.d(TAG, “onComplete() called”);
}
});

我们看一下map函数的源码:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//判空略过
ObjectHelper.requireNonNull(mapper, “mapper is null”);
//RxJavaPlugins.onAssembly()是hook 上文提到过
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

RxJavaPlugins.onAssembly()是hook 上文提到过,所以我们只要看ObservableMap,它就是返回到我们手里的Observable:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
//将function变换函数类保存起来
final Function<? super T, ? extends U> function;

public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
//super()将上游的Observable保存起来 ,用于subscribeActual()中用。
super(source);
this.function = function;
}

@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}

它继承自AbstractObservableWithUpstream,该类继承自Observable,很简单,就是将上游的ObservableSource保存起来,做一次wrapper,所以它也算是装饰者模式的提现,如下:

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
//将上游的`ObservableSource`保存起来
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}

关于ObservableSource,代表了一个标准的无背压的 源数据接口,可以被Observer消费(订阅),如下:

public interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}

所有的Observable都已经实现了它,所以我们可以认为Observable和ObservableSource在本文中是相等的:

public abstract class Observable<T> implements ObservableSource<T> {

所以我们得到的ObservableMap对象也很简单,就是将上游的Observable和变换函数类Function保存起来。
Function的定义超级简单,就是一个接口,给我一个T,还你一个R.

public interface Function<T, R> {
R apply(T t) throws Exception;
}

本例写的是将String->int.

重头戏,subscribeActual()是订阅真正发生的地方,ObservableMap如下编写,就一句话,用MapObserver订阅上游Observable。:

@Override
public void subscribeActual(Observer<? super U> t) {
//用MapObserver订阅上游Observable。
source.subscribe(new MapObserver<T, U>(t, function));
}

MapObserver也是装饰者模式,对终点(下游)Observer修饰。

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
//super()将actual保存起来
super(actual);
//保存Function变量
this.mapper = mapper;
}
@Override
public void onNext(T t) {
//done在onError 和 onComplete以后才会是true,默认这里是false,所以跳过
if (done) {
return;
}
//默认sourceMode是0,所以跳过
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
//下游Observer接受的值
U v;
//这一步执行变换,将上游传过来的T,利用Function转换成下游需要的U。
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), “The mapper function returned a null value.”);
} catch (Throwable ex) {
fail(ex);
return;
}
//变换后传递给下游Observer
actual.onNext(v);
}

到此我们梳理一下流程:
订阅的过程,是从下游到上游依次订阅的。

即终点 Observer 订阅了 map 返回的ObservableMap。
然后map的Observable(ObservableMap)在被订阅时,会订阅其内部保存上游Observable,用于订阅上游的Observer是一个装饰者(MapObserver),内部保存了下游(本例是终点)Observer,以便上游发送数据过来时,能传递给下游。
以此类推,直到源头Observable被订阅,根据上节课内容,它开始向Observer发送数据。

数据传递的过程,当然是从上游push到下游的,

源头Observable传递数据给下游Observer(本例就是MapObserver)
然后MapObserver接收到数据,对其变换操作后(实际的function在这一步执行),再调用内部保存的下游Observer的onNext()发送数据给下游
以此类推,直到终点Observer。

线程调度subscribeOn

简化问题,代码如下:

Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d(TAG, “subscribe() called with: e = [” + e + “]” + Thread.currentThread());
e.onNext(“1”);
e.onComplete();
}
//只是在Observable和Observer之间增加了一句线程调度代码
}).subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, “onSubscribe() called with: d = [” + d + “]”);
}
@Override
public void onNext(String value) {
Log.d(TAG, “onNext() called with: value = [” + value + “]”);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, “onError() called with: e = [” + e + “]”);
}
@Override
public void onComplete() {
Log.d(TAG, “onComplete() called”);
}
});

只是在Observable和Observer之间增加了一句线程调度代码:.subscribeOn(Schedulers.io()).
查看subscribeOn()源码:

public final Observable<T> subscribeOn(Scheduler scheduler) {
//判空略过
ObjectHelper.requireNonNull(scheduler, “scheduler is null”);
//抛开Hook,重点还是ObservableSubscribeOn
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

等等,怎么有种似曾相识的感觉,大家可以把文章向上翻,看看map()的源码。
和subscribeOn()的套路如出一辙,那么我们根据上面的结论,
先猜测ObservableSubscribeOn类也是一个包装类(装饰者),点进去查看:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
//保存线程调度器
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
//map的源码中我们分析过,super()只是简单的保存ObservableSource
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
//1  创建一个包装Observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//2  手动调用 下游(终点)Observer.onSubscribe()方法,所以onSubscribe()方法执行在 订阅处所在的线程
s.onSubscribe(parent);
//3 setDisposable()是为了将子线程的操作加入Disposable管理中
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
//4 此时已经运行在相应的Scheduler 的线程中
source.subscribe(parent);
}
}));
}

和map套路大体一致,ObservableSubscribeOn自身同样是个包装类,同样继承AbstractObservableWithUpstream。
创建了一个SubscribeOnObserver类,该类按照套路,应该也是实现了Observer、Disposable接口的包装类,让我们看一下:

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
//真正的下游(终点)观察者
final Observer<? super T> actual;
//用于保存上游的Disposable,以便在自身dispose时,连同上游一起dispose
final AtomicReference<Disposable> s;

SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}

@Override
public void onSubscribe(Disposable s) {
//onSubscribe()方法由上游调用,传入Disposable。在本类中赋值给this.s,加入管理。
DisposableHelper.setOnce(this.s, s);
}

//直接调用下游观察者的对应方法
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}

//取消订阅时,连同上游Disposable一起取消
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
//这个方法在subscribeActual()中被手动调用,为了将Schedulers返回的Worker加入管理
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}

这两个类根据上一节的铺垫加上注释,其他都好理解,稍微不好理解的应该是下面两句代码:

//ObservableSubscribeOn类
//3 setDisposable()是为了将子线程的操作加入Disposable管理中
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
//4 此时已经运行在相应的Scheduler 的线程中
source.subscribe(parent);
}
}));

//SubscribeOnObserver类
//这个方法在subscribeActual()中被手动调用,为了将Schedulers返回的Worker加入管理
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}

其中scheduler.scheduleDirect(new Runnable()..)方法源码如下:

/**
* Schedules the given task on this scheduler non-delayed execution.
* …..
*/
public Disposable scheduleDirect(Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

从注释和方法名我们可以看出,这个传入的Runnable会立刻执行。
再继续往里面看:

public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
//class Worker implements Disposable ,Worker本身是实现了Disposable
final Worker w = createWorker();
//hook略过
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//开始在Worker的线程执行任务,
w.schedule(new Runnable() {
@Override
public void run() {
try {
//调用的是 run()不是 start()方法执行的线程的方法。
decoratedRun.run();
} finally {
//执行完毕会 dispose()
w.dispose();
}
}
}, delay, unit);
//返回Worker对象
return w;
}

createWorker()是一个抽象方法,由具体的Scheduler类实现,例如IoScheduler对应的Schedulers.io().

public abstract Worker createWorker();

初看源码,为了了解大致流程,不宜过入深入,先点到为止。
OK,现在我们总结一下scheduler.scheduleDirect(new Runnable()..)的重点:

传入的Runnable是立刻执行的。
返回的Worker对象就是一个Disposable对象,
Runnable执行时,是直接手动调用的 run(),而不是 start()方法.
上一点应该是为了,能控制在run()结束后(包括异常终止),都会自动执行Worker.dispose().

而返回的Worker对象也会被parent.setDisposable(…)加入管理中,以便在手动dispose()时能取消线程里的工作。

我们总结一下subscribeOn(Schedulers.xxx())的过程:

返回一个ObservableSubscribeOn包装类对象
上一步返回的对象被订阅时,回调该类中的subscribeActual()方法,在其中会立刻将线程切换到对应的Schedulers.xxx()线程。
在切换后的线程中,执行source.subscribe(parent);,对上游(终点)Observable订阅
上游(终点)Observable开始发送数据,根据RxJava2 源码解析(一),上游发送数据仅仅是调用下游观察者对应的onXXX()方法而已,所以此时操作是在切换后的线程中进行。

一点扩展,
大家可能看过一个结论:
subscribeOn(Schedulers.xxx())切换线程N次,总是以*次为准,或者说离源Observable*近的那次为准,并且对其上面的代码生效(这一点对比的ObserveOn())。

为什么?
– 因为根据RxJava2 源码解析(一)中提到,订阅流程从下游往上游传递
– 在subscribeActual()里开启了Scheduler的工作,source.subscribe(parent);,从这一句开始切换了线程,所以在这之上的代码都是在切换后的线程里的了。
– 但如果连续切换,*上面的切换*晚执行,此时线程变成了*上面的subscribeOn(xxxx)指定的线程,
– 而数据push时,是从上游到下游的,所以会在离源头*近的那次subscribeOn(xxxx)的线程里push数据(onXXX())给下游。

可写如下代码验证:

Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d(TAG, “subscribe() called with: e = [” + e + “]” + Thread.currentThread());
e.onNext(“1”);
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
//依然是io线程
Log.d(TAG, “apply() called with: s = [” + s + “]” + Thread.currentThread());
return s;
}
})
.subscribeOn(Schedulers.computation())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, “onSubscribe() called with: d = [” + d + “]”);
}
@Override
public void onNext(String value) {
Log.d(TAG, “onNext() called with: value = [” + value + “]”);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, “onError() called with: e = [” + e + “]”);
}
@Override
public void onComplete() {
Log.d(TAG, “onComplete() called”);
}
});

线程调度observeOn

在上一节的基础上,增加一个observeOn(AndroidSchedulers.mainThread()),就完成了观察者线程的切换。

.subscribeOn(Schedulers.computation())
//在上一节的基础上,增加一个ObserveOn
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {

继续看源码吧,我已经能猜出来了,hook+new XXXObservable();

public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
….
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

果然,查看ObservableObserveOn,:
高能预警,这部分的代码 有些略多,建议读者打开源码边看边读。

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
//本例是 AndroidSchedulers.mainThread()
final Scheduler scheduler;
//默认false
final boolean delayError;
//默认128
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
// false
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//1 创建出一个 主线程的Worker
Scheduler.Worker w = scheduler.createWorker();
//2 订阅上游数据源,
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}

本例中,就是两步:

创建一个AndroidSchedulers.mainThread()对应的Worker
用ObserveOnObserver订阅上游数据源。这样当数据从上游push下来,会由ObserveOnObserver对应的onXXX()处理。

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
//下游的观察者
final Observer<? super T> actual;
//对应Scheduler里的Worker
final Scheduler.Worker worker;
//上游被观察者 push 过来的数据都存在这里
SimpleQueue<T> queue;
Disposable s;
//如果onError了,保存对应的异常
Throwable error;
//是否完成
volatile boolean done;
//是否取消
volatile boolean cancelled;
// 代表同步发送 异步发送
int sourceMode;
….
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
//省略大量无关代码
//创建一个queue 用于保存上游 onNext() push的数据
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//回调下游观察者onSubscribe方法
actual.onSubscribe(this);
}
}

@Override
public void onNext(T t) {
//1 执行过error / complete 会是true
if (done) {
return;
}
//2 如果数据源类型不是异步的, 默认不是
if (sourceMode != QueueDisposable.ASYNC) {
//3 将上游push过来的数据 加入 queue里
queue.offer(t);
}
//4 开始进入对应Workder线程,在线程里 将queue里的t 取出 发送给下游Observer
schedule();
}

@Override
public void onError(Throwable t) {
//已经done 会 抛异常 和 上一篇文章里提到的一样
if (done) {
RxJavaPlugins.onError(t);
return;
}
//给error存个值
error = t;
done = true;
//开始调度
schedule();
}

@Override
public void onComplete() {
//已经done 会 返回  不会crash 和上一篇文章里提到的一样
if (done) {
return;
}
done = true;
//开始调度
schedule();
}

void schedule() {
if (getAndIncrement() == 0) {
//该方法需要传入一个线程, 注意看本类实现了Runnable的接口,所以查看对应的run()方法
worker.schedule(this);
}
}
//从这里开始,这个方法已经是在Workder对应的线程里执行的了
@Override
public void run() {
//默认是false
if (outputFused) {
drainFused();
} else {
//取出queue里的数据 发送
drainNormal();
}
}

void drainNormal() {
int missed = 1;

final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;

for (;;) {
// 1 如果已经 终止 或者queue空,则跳出函数,
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}

for (;;) {
boolean d = done;
T v;

try {
//2 从queue里取出一个值
v = q.poll();
} catch (Throwable ex) {
//3 异常处理 并跳出函数
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
return;
}
boolean empty = v == null;
//4 再次检查 是否 终止  如果满足条件 跳出函数
if (checkTerminated(d, empty, a)) {
return;
}
//5 上游还没结束数据发送,但是这边处理的队列已经是空的,不会push给下游 Observer
if (empty) {
//仅仅是结束这次循环,不发送这个数据而已,并不会跳出函数
break;
}
//6 发送给下游了
a.onNext(v);
}

//7 对不起这里我也不是很明白,大致猜测是用于 同步原子操作 如有人知道 烦请告知
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}

//检查 是否 已经 结束(error complete), 是否没数据要发送了(empty 空),
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
//如果已经disposed
if (cancelled) {
queue.clear();
return true;
}
// 如果已经结束
if (d) {
Throwable e = error;
//如果是延迟发送错误
if (delayError) {
//如果空
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
//停止worker(线程)
worker.dispose();
return true;
}
} else {
//发送错误
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
//发送complete
if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
}

核心处都加了注释,总结起来就是,

ObserveOnObserver实现了Observer和Runnable接口。
在onNext()里,先不切换线程,将数据加入队列queue。然后开始切换线程,在另一线程中,从queue里取出数据,push给下游Observer
onError() onComplete()除了和RxJava2 源码解析(一)提到的一样特性之外,也是将错误/完成信息先保存,切换线程后再发送。
所以observeOn()影响的是其下游的代码,且多次调用仍然生效。
因为其切换线程代码是在Observer里onXXX()做的,这是一个主动的push行为(影响下游)。
关于多次调用生效问题。对比subscribeOn()切换线程是在subscribeActual()里做的,只是主动切换了上游的订阅线程,从而影响其发射数据时所在的线程。而直到真正发射数据之前,任何改变线程的行为,都会生效(影响发射数据的线程)。所以subscribeOn()只生效一次。observeOn()是一个主动的行为,并且切换线程后会立刻发送数据,所以会生效多次.

转载请标明出处:
http://blog.csdn.net/zxt0601/article/details/61637439
本文出自:【张旭童的博客】(http://blog.csdn.net/zxt0601)

总结

本文带大家走读分析了三个东西:

map操作符原理:

内部对上游Observable进行订阅
内部订阅者接收到数据后,将数据转换,发送给下游Observer.
操作符返回的Observable和其内部订阅者、是装饰者模式的体现。
操作符数据变换的操作,也是发生在订阅后。

线程调度subscribeOn():

内部先切换线程,在切换后的线程中对上游Observable进行订阅,这样上游发送数据时就是处于被切换后的线程里了。
也因此多次切换线程,*后一次切换(离源数据*近)的生效。
内部订阅者接收到数据后,直接发送给下游Observer.
引入内部订阅者是为了控制线程(dispose)
线程切换发生在Observable中。

线程调度observeOn():

使用装饰的Observer对上游Observable进行订阅
在Observer中onXXX()方法里,将待发送数据存入队列,同时请求切换线程处理真正push数据给下游。
多次切换线程,都会对下游生效。

源码里那些实现了Runnable的类或者匿名内部类,*终并没有像往常那样被丢给Thread类执行。
而是先切换线程,再直接执行Runnable的run()方法。
这也加深了我对面向对象,对抽象、Runnable的理解,它就是一个简简单单的接口,里面就一个简简单单的run(),
我认为,之所以有Runnable,只是抽象出 一个可运行的任务的概念。
也许这句话很平淡,书上也会提到,各位大佬早就知道,但是如今我顺着RxJava2的源码这么走读了一遍,确真真切切的感受到了这些设计思想的美妙。
———————

RxJava2 源码解析(一)

概述

*近决定找工作,准备面试中,打算看一看RxJava2的源码,遂有了这篇文章。

不会对RxJava2的源码逐字逐句的阅读,只寻找关键处,我们平时接触得到的那些代码。
背压实际中接触较少,故只分析了Observable.
分析的源码版本为:2.0.1

我们的目的:

知道源头(Observable)是如何将数据发送出去的。
知道终点(Observer)是如何接收到数据的。
何时将源头和终点关联起来的
知道线程调度是怎么实现的
知道操作符是怎么实现的

本文先达到目的1 ,2 ,3。
我个人认为主要还是适配器模式的体现,我们接触的就只有Observable和Observer,其实内部有大量的中间对象在适配:将它们两联系起来,加入一些额外功能,例如考虑dispose和hook等。
从create开始。

这是一段不涉及操作符和线程切换的简单例子:

Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext(“1”);
e.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, “onSubscribe() called with: d = [” + d + “]”);
}

@Override
public void onNext(String value) {
Log.d(TAG, “onNext() called with: value = [” + value + “]”);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, “onError() called with: e = [” + e + “]”);
}

@Override
public void onComplete() {
Log.d(TAG, “onComplete() called”);
}
});

拿 create来说,

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//…..
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

返回值是Observable,参数是ObservableOnSubscribe,定义如下:

public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> e) throws Exception;
}

ObservableOnSubscribe是一个接口,里面就一个方法,也是我们实现的那个方法:
该方法的参数是 ObservableEmitter,我认为它是关联起 Disposable概念的一层:

public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(Disposable d);
void setCancellable(Cancellable c);
boolean isDisposed();
ObservableEmitter<T> serialize();
}

ObservableEmitter也是一个接口。里面方法很多,它也继承了 Emitter<T> 接口。

public interface Emitter<T> {
void onNext(T value);
void onError(Throwable error);
void onComplete();
}

Emitter<T>定义了 我们在ObservableOnSubscribe中实现subscribe()方法里*常用的三个方法。

好,我们回到原点,create()方法里就一句话return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));,其中提到RxJavaPlugins.onAssembly():

/**
* Calls the associated hook function.
* @param <T> the value type
* @param source the hook’s input value
* @return the value returned by the hook
*/
@SuppressWarnings({ “rawtypes”, “unchecked” })
public static <T> Observable<T> onAssembly(Observable<T> source) {
Function<Observable, Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}

可以看到这是一个关于hook的方法,关于hook我们暂且不表,不影响主流程,我们默认使用中都没有hook,所以这里就是直接返回source,即传入的对象,也就是new ObservableCreate<T>(source).

ObservableCreate我认为算是一种适配器的体现,create()需要返回的是Observable,而我现在有的是(方法传入的是)ObservableOnSubscribe对象,ObservableCreate将ObservableOnSubscribe适配成Observable。
其中subscribeActual()方法表示的是被订阅时真正被执行的方法,放后面解析:

public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

OK,至此,创建流程结束,我们得到了Observable<T>对象,其实就是ObservableCreate<T>.
到订阅subscribe 结束

subscribe():

public final void subscribe(Observer<? super T> observer) {

try {
//1 hook相关,略过
observer = RxJavaPlugins.onSubscribe(this, observer);

//2 真正的订阅处
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
//3 错误处理,
Exceptions.throwIfFatal(e);
// can’t call onError because no way to know if a Disposable has been set or not
// can’t call onSubscribe because the call might have set a Subscription already
//4 hook错误相关,略过
RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException(“Actually not, but can’t throw other exceptions due to RS”);
npe.initCause(e);
throw npe;
}
}

关于hook的代码:
可以看到如果没有hook,即相应的对象是null,则是传入什么返回什么的。

/**
* Calls the associated hook function.
* @param <T> the value type
* @param source the hook’s input value
* @param observer the observer
* @return the value returned by the hook
*/
@SuppressWarnings({ “rawtypes”, “unchecked” })
public static <T> Observer<? super T> onSubscribe(Observable<T> source, Observer<? super T> observer) {
//1 默认onObservableSubscribe(可理解为一个flatmap的操作)是null
BiFunction<Observable, Observer, Observer> f = onObservableSubscribe;
//2 所以这句跳过,不会对其进行apply
if (f != null) {
return apply(f, source, observer);
}
//3 返回参数2
return observer;
}

我也是验证了一下 三个Hook相关的变量,确实是null:

Consumer<Throwable> errorHandler = RxJavaPlugins.getErrorHandler();
BiFunction<Observable, Observer, Observer> onObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe();
Function<Observable, Observable> onObservableAssembly = RxJavaPlugins.getOnObservableAssembly();

Log.e(TAG, “errorHandler = [” + errorHandler + “]”);
Log.e(TAG, “onObservableSubscribe = [” + onObservableSubscribe + “]”);
Log.e(TAG, “onObservableAssembly = [” + onObservableAssembly + “]”);

所以订阅时的重点就是:

//2 真正的订阅处
subscribeActual(observer);

我们将*节提到的ObservableCreate里的subscribeActual()方法拿出来看看:

@Override
protected void subscribeActual(Observer<? super T> observer) {
//1 创建CreateEmitter,也是一个适配器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//2 onSubscribe()参数是Disposable ,所以CreateEmitter可以将Observer->Disposable 。还有一点要注意的是`onSubscribe()`是在我们执行`subscribe()`这句代码的那个线程回调的,并不受线程调度影响。
observer.onSubscribe(parent);
try {
//3 将ObservableOnSubscribe(源头)与CreateEmitter(Observer,终点)联系起来
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
//4 错误回调
parent.onError(ex);
}
}

Observer是一个接口,里面就四个方法,我们在开头的例子中已经全部实现(打印Log)。

public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T value);
void onError(Throwable e);
void onComplete();
}

重点在这一句:

//3 将ObservableOnSubscribe(源头)与CreateEmitter(Observer,终点)联系起来
source.subscribe(parent);

source即ObservableOnSubscribe对象,在本文中是:

new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext(“1”);
e.onComplete();
}
}

则会调用parent.onNext() 和parent.onComplete(),parent是CreateEmitter对象,如下:

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}

@Override
public void onNext(T t) {

//如果没有被dispose,会调用Observer的onNext()方法
if (!isDisposed()) {
observer.onNext(t);
}
}

@Override
public void onError(Throwable t) {

//1 如果没有被dispose,会调用Observer的onError()方法
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
//2 一定会自动dispose()
dispose();
}
} else {
//3 如果已经被dispose了,会抛出异常。所以onError、onComplete彼此互斥,只能被调用一次
RxJavaPlugins.onError(t);
}
}

@Override
public void onComplete() {
//1 如果没有被dispose,会调用Observer的onComplete()方法
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
//2 一定会自动dispose()
dispose();
}
}
}

@Override
public void dispose() {
DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}

总结重点:

Observable和Observer的关系没有被dispose,才会回调Observer的onXXXX()方法
Observer的onComplete()和onError() 互斥只能执行一次,因为CreateEmitter在回调他们两中任意一个后,都会自动dispose()。根据*点,验证此结论。
Observable和Observer关联时(订阅时),Observable才会开始发送数据。
ObservableCreate将ObservableOnSubscribe(真正的源)->Observable.
ObservableOnSubscribe(真正的源)需要的是发射器ObservableEmitter.
CreateEmitter将Observer->ObservableEmitter,同时它也是Disposable.
先error后complete,complete不显示。 反之会crash,感兴趣的可以写如下代码验证。

e.onNext(“1”);
//先error后complete,complete不显示。 反之 会crash
//e.onError(new IOException(“sb error”));
e.onComplete();
e.onError(new IOException(“sb error”));

一个好玩的地方DisposableHelper

原本到这里,*简单的一个流程我们算是搞清了。
还值得一提的是,DisposableHelper.dispose(this);
DisposableHelper很有趣,它是一个枚举,这是利用枚举实现了一个单例disposed state,即是否disposed,如果Disposable类型的变量的引用等于DISPOSED,则起点和终点已经断开联系。
其中大多数方法 都是静态方法,所以isDisposed()方法的实现就很简单,直接比较引用即可.
其他的几个方法,和AtomicReference类搅基在了一起。
这是一个实现引用原子操作的类,对象引用的原子更新,常用方法如下:

//返回当前的引用。
V get()
//如果当前值与给定的expect引用相等,(注意是引用相等而不是equals()相等),更新为指定的update值。
boolean compareAndSet(V expect, V update)
//原子地设为给定值并返回旧值。
V getAndSet(V newValue)

OK,铺垫完了我们看看源码吧:

public enum DisposableHelper implements Disposable {
/**
* The singleton instance representing a terminal, disposed state, don’t leak it.
*/
DISPOSED
;

public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}

public static boolean dispose(AtomicReference<Disposable> field) {
//1 通过断点查看,默认情况下,field的值是”null”,并非引用是null哦!大坑大坑大坑
//但是current是null引用
Disposable current = field.get();
Disposable d = DISPOSED;
//2 null不等于DISPOSED
if (current != d) {
//3 field是DISPOSED了,current还是null
current = field.getAndSet(d);
if (current != d) {
//4 默认情况下 走不到这里,这里是在设置了setCancellable()后会走到。
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}

总结

在subscribeActual()方法中,源头和终点关联起来。
source.subscribe(parent);这句代码执行时,才开始从发送ObservableOnSubscribe中利用ObservableEmitter发送数据给Observer。即数据是从源头push给终点的。
CreateEmitter 中,只有Observable和Observer的关系没有被dispose,才会回调Observer的onXXXX()方法
Observer的onComplete()和onError() 互斥只能执行一次,因为CreateEmitter在回调他们两中任意一个后,都会自动dispose()。根据上一点,验证此结论。
先error后complete,complete不显示。 反之会crash
还有一点要注意的是onSubscribe()是在我们执行subscribe()这句代码的那个线程回调的,并不受线程调度影响。
———————

这可能是*好的RxJava 2.x 入门教程

前言

RxJava 对大家而言肯定不陌生,其受欢迎程度不言而喻。而在去年的早些时候,官方便宣布,将在一段时间后不再对 RxJava 1.x 进行维护,而在仓库中另辟蹊径,开始对 RxJava 2.x 进行推广起来,我原本是不想写这么一套教程的,因为 RxJava 受欢迎度这么高,而且这 2.x 也出来了这么久,我坚信网上一定有很多超级大牛早已为大家避雷。然而很难过的是,我搜索了些时间,能搜出来的基本都是对 RxJava 1.x 的讲解,或者是 Blog 标题就没说清楚是否是 2.x 系列(对于我们这种标题党来说很难受)。这不,我就来抛砖引玉了。

咱们先不提别的,先为大家带点可能你早已熟知的干货——来自扔物线大神的给Android开发者的 RxJava 详解。

该文详细地为大家讲解了 RxJava 的优势、原理以及使用方式和适用情景,一定被众多的 Android 开发者视为神器。可惜,文章历史比较久远,基本都是讲解的 RxJava 1.x了。

那关注的小伙伴一定会问,那我没用过 RxJava 1.x ,还有必要先学习 1.x 的内容吗?

个人觉得不必要,因为 RxJava 2.x 是按照 Reactive-Streams specification 规范完全的重写的,完全独立于 RxJava 1.x 而存在,它改变了以往 RxJava 的用法。

额,由于个人能力水平有限,所以对于英文基础好的,大家可以去官网查阅相关 API 介绍,而对于英文不那么流畅的童鞋,我也为大家准备了干货:RxJava2Examples (正在更新)。

与RxJava 1.x的差异

其实,我标题为入门教程,按理说应该从简单入门开始讲的,原谅我突然偏题了,因为我觉得可能大多数人都了解或者使用过RxJava 1.x(因为它真的太棒了)。虽然可能熟悉1.x 的你可以直接扒文档就可以了,但这么大的变化,请原谅我还在这里瞎比比。

  • Nulls
    这是一个很大的变化,熟悉 RxJava 1.x 的童鞋一定都知道,1.x 是允许我们在发射事件的时候传入 null 值的,但现在我们的 2.x 不支持了,不信你试试? 大大的 NullPointerException 教你做人。这意味着 Observable<Void> 不再发射任何值,而是正常结束或者抛出空指针。
  • 2、Flowable
    在 RxJava 1.x 中关于介绍 backpressure 部分有一个小小的遗憾,那就是没有用一个单独的类,而是使用 Observable 。而在 2.x 中 Observable 不支持背压了,将用一个全新的 Flowable 来支持背压。
    或许对于背压,有些小伙伴们还不是特别理解,这里简单说一下。大概就是指在异步场景中,被观察者发送事件的速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。感兴趣的小伙伴可以模拟这种情况,在差距太大的时候,我们的内存会猛增,直到OOM。而我们的 Flowable 一定意义上可以解决这样的问题,但其实并不能完全解决,这个后面可能会提到。
  • Single/Completable/Maybe
    其实这三者都差不多,Single 顾名思义,只能发送一个事件,和 Observable接受可变参数完全不同。而 Completable 侧重于观察结果,而 Maybe 是上面两种的结合体。也就是说,当你只想要某个事件的结果(true or false)的时候,你可以使用这种观察者模式。
  • 线程调度相关
    这一块基本没什么改动,但细心的小伙伴一定会发现,RxJava 2.x 中已经没有了 Schedulers.immediate()这个线程环境,还有 Schedulers.test()
  • Function相关
    熟悉 1.x 的小伙伴一定都知道,我们在1.x 中是有 Func1Func2…..FuncN的,但 2.x 中将它们移除,而采用 Function 替换了 Func1,采用 BiFunction 替换了 Func 2..N。并且,它们都增加了 throws Exception,也就是说,妈妈再也不用担心我们做某些操作还需要 try-catch 了。
  • 其他操作符相关
    如 Func1...N 的变化,现在同样用 Consumer 和 BiConsumer 对 Action1 和 Action2 进行了替换。后面的 Action 都被替换了,只保留了 ActionN

附录

下面从官方截图展示 2.x 相对 1.x 的改动细节,仅供参考。

%title插图%num
%title插图%num
%title插图%num
%title插图%num
%title插图%num
%title插图%num
%title插图%num
%title插图%num

RxJava 从入门到全解析

转载自:http://www.apkbus.com/blog-873055-77431.html

前言

使用了RxJava有一段时间了,深深感受到了其“牛逼”之处。下面,就从RxJava的基础开始,一步一步与大家分享一下这个强大的异步库的用法!

RxJava 概念初步

RxJava 在Github Repo上给的解释是:

“RxJava is a Java VM implementation of Reactive Extensions: 

a library for composing asynchronous and event-based programs by using observable sequences.”

大概就是说RxJava是Java VM上一个灵活的、使用可观测序列来组成的一个异步的、基于事件的库。咋一看好像不知道是啥东西… … 没事,往下看~

作用 – 异步

上面 这段解释,重点就在于异步!但是它又不像 AsyncTask 这样用法简单,所以刚接触RxJava的童鞋,可能会觉得特别难,无从下手,没事,相信通过这篇文章,大伙儿可以有一个比较深刻的理解!

RxJava精华可以浓缩为异步两个字,其核心的东西不外乎两个:

1.  Observable(被观察者) 

2.  Observer/Subscriber(观察者)

Observables可以发出一系列的 事件,这里的事件可以是任何东西,例如网络请求、复杂计算处理、数据库操作、文件操作等等,事件执行结束后交给 Observer/Subscriber 的回调处理。

模式 – 观察者模式

观察者模式是一种对象的行为模式,是 Java 设计模式中很常用的一个模式。观察者模式也常称为:

发布-订阅模式(Publish/Subscribe)

模型-视图模式(Model/View)

源-监听器模式(Source/Listener)

从属者模式(Dependents)

例如用过事件总线 EventBus 库的童鞋就知道,EventBus 属于发布-订阅模式(Publish/Subscribe)。

// 事件订阅@Subscribe(threadMode = ThreadMode.MAIN)public void showDownProgress(MyEvent event) {     // TODO}// 事件发布EventBus.getDefault().post(new MyEvent());

实际上,使用 RxJava 也可以设计出一套事件总线的库,这个称为 RxBus。有兴趣的话可以在学完 RxJava 之后,可以尝试写一个。这里就不细说了~

为啥说这个呢?因为,RxJava 也是一种扩展的观察者模式!

举个栗子,Android 中 View 的点击监听器的实现,View 是被观察者,OnClickListener 对象是观察者,Activity 要如何知道 View 被点击了?那就是构造一个 OnClickListener 对象,通过 setOnClickListener 与View达成一个订阅关系,一旦 View 被点击了,就通过OnClickListener对象的 OnClick 方法传达给 Activity 。采用观察者模式可以避免去轮询检查,节约有限的cpu资源。

结构 – 响应式编程

响应式?顾名思义,就是“你变化,我响应”。举个栗子,a = b + c; 这句代码将b+c的值赋给a,而之后如果b和c的值改变了不会影响到a,然而,对于响应式编程,之后b和c的值的改变也动态影响着a,意味着a会随着b和c的变化而变化。

响应式编程的组成为Observable/Operator/Subscriber,RxJava在响应式编程中的基本流程如下:

Observable -> Operator 1 -> Operator 2 -> Operator 3 -> Subscriber

这个流程,可以简单的理解为:

  1. Observable 发出一系列事件,他是事件的产生者;
  2. Subscriber 负责处理事件,他是事件的消费者;
  3. Operator 是对 Observable 发出的事件进行修改和变换;
  4. 若事件从产生到消费不需要其他处理,则可以省略掉中间的 Operator,从而流程变为 Obsevable -> Subscriber
  5. Subscriber 通常在主线程执行,所以原则上不要去处理太多的事务,而这些复杂的事务处理则交给 Operator;

优势 – 逻辑简洁

Rx 优势可以概括为四个字,那就是 逻辑简洁。然而,逻辑简洁并不意味着代码简洁,但是,由于链式结构,一条龙,你可以从头到尾,从上到下,很清楚的看到这个连式结构的执行顺序。对于开发人员来说,代码质量并不在于代码量,而在于逻辑是否清晰简洁,可维护性如何,代码是否健壮!

另外,熟悉lambda的,还可以进一步提高代码的简洁性。举个简单栗子对比一下,暂时不需要过多理解,后面会一一道来:

// 不使用lambdaObservable.just("Hello World!")
     .map(new Func1<String, String>() {         @Override
         public String call(String s) {             return s + "I am kyrie!";
         }
     })
     .subscribeOn(Schedulers.io())
     .observeOn(AndroidSchedulers.mainThread())
     .subscribe(new Action1<String>() {         @Override
         public void call(String s) {
             Log.i(TAG, s);
         }
     });// 使用lambdaObservable.just("Hello World!")
    .map(s -> s + "I am kyrie!")
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(s -> {
        Log.i(TAG, s);
    });

RxJava 依赖

在 Android Studio 项目下,为 module 增加 Gradle 依赖。

// Android 平台下须引入的一个依赖,主要用于线程控制compile 'io.reactivex:rxandroid:1.1.0'// RxJavacompile 'io.reactivex:rxjava:1.1.5'

这是我项目里面用的版本,也可以到Maven/RxJava下获取*新版本。

RxJava 入门

前面讲了那么多,大家在概念上对RxJava有一个初步的认识就好,接下来,将为您解开RxJava神秘的面纱~~

无需过分纠结于“事件”这个词,暂时可以简单的把“事件”看成是一个值,或者一个对象。

  1. 事件产生,就是构造要传递的对象;
  2. 事件处理变换,就是改变传递的对象,可以改变对象的值,或是干脆创建个新对象,新对象类型也可以与源对象不一样;
  3. 事件处理,就是接收到对象后要做的事;

事件产生

RxJava创建一个事件比较简单,由 Observable 通过 create 操作符来创建。举个栗子,还是经典的 HelloWorld~~

// 创建一个ObservableObservable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {    @Override
    public void call(Subscriber<? super String> subscriber) {        // 发送一个 Hello World 事件
        subscriber.onNext("Hello World!");        // 事件发送完成
        subscriber.onCompleted();
    }
});

这段代码可以理解为, Observable 发出了一个类型为 String ,值为 “Hello World!” 的事件,仅此而已。

对于 Subscriber 来说,通常onNext()可以多次调用,*后调用onCompleted()表示事件发送完成。

上面这段代码,也可以通过just操作符进行简化。RxJava常用操作符后面会详细介绍,这里先有个了解。

// 创建对象,just里面的每一个参数,相当于调用一次Subscriber#OnNext()Observable<String> observable = Observable.just("Hello World!");

这样,是不是简单了许多?

事件消费

有事件产生,自然也要有事件消费。RxJava 可以通过 subscribe 操作符,对上述事件进行消费。首先,先创建一个观察者。

// 创建一个ObserverObserver<String> observer = new Observer<String>() {    @Override
    public void onCompleted() {
        Log.i(TAG, "complete");
    }    @Override
    public void onError(Throwable e) {

    }    @Override
    public void onNext(String s) {
        Log.i(TAG, s);
    }
};

或者

// 创建一个SubscriberSubscriber<String> subscriber = new Subscriber<String>() {    @Override
    public void onCompleted() {
        Log.i(TAG, "complete");
    }    @Override
    public void onError(Throwable e) {

    }    @Override
    public void onNext(String s) {
        Log.i(TAG, s);
    }
};
  1. Observer 是观察者, Subscriber 也是观察者,Subscriber 是一个实现了Observer接口的抽象类,对 Observer 进行了部分扩展,在使用上基本没有区别;
  2. Subscriber 多了发送之前调用的 onStart() 和解除订阅关系的 unsubscribe() 方法。
  3. 并且,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以在这之后的示例代码,都使用 Subscriber 来作为观察者。

事件订阅

*后,我们可以调用 subscribe 操作符, 进行事件订阅。

// 订阅事件observable.subscribe(subscriber);

在 Subscriber 实现的三个方法中,顾名思义,对应三种不同状态:
1. onComplete(): 事件全部处理完成后回调
2. onError(Throwable t): 事件处理异常回调
3. onNext(T t): 每接收到一个事件,回调一次

区分回调动作

对于事件消费事件订阅来说,好像为了打印一个“Hello World!”要费好大的劲… 其实,RxJava 自身提供了精简回调方式,我们可以为 Subscriber 中的三种状态根据自身需要分别创建一个回调动作 Action

// onComplete()Action0 onCompleteAction = new Action0() {    @Override
    public void call() {
        Log.i(TAG, "complete");
    }
};// onNext(T t)Action1<String> onNextAction = new Action1<String>() {    @Override
    public void call(String s) {
        Log.i(TAG, s);
    }
};// onError(Throwable t)Action1<Throwable> onErrorAction = new Action1<Throwable>() {    @Override
    public void call(Throwable throwable) {

    }
};

那么,RxJava 的事件订阅支持以下三种不完整定义的回调。

observable.subscribe(onNextAction);

observable.subscribe(onNextAction, onErrorAction);

observable.subscribe(onNextAction, onErrorAction, onCompleteAction);

我们可以根据当前需要,传入对应的 Action, RxJava 会相应的自动创建 Subscriber。

  1. Action0 表示一个无回调参数的Action;
  2. Action1 表示一个含有一个回调参数的Action;
  3. 当然,还有Action2 ~ Action9,分别对应2~9个参数的Action;
  4. 每个Action,都有一个 call() 方法,通过泛型T,来指定对应参数的类型;

入门示例

前面讲解了事件的产生到消费、订阅的过程,下面就举个完整的例子。从res/mipmap中取出一张图片,显示在ImageView上。

final ImageView ivLogo = (ImageView) findViewById(R.id.ivLogo);

Observable.create(new Observable.OnSubscribe<Drawable>() {        @Override
        public void call(Subscriber<? super Drawable> subscriber) {            // 从mipmap取出一张图片作为Drawable对象
            Drawable drawable = ContextCompat.getDrawable(mContext, R.mipmap.ic_launcher);            // 把Drawable对象发送出去
            subscriber.onNext(drawable);

            subscriber.onCompleted();
        }
    })
    .subscribe(new Subscriber<Drawable>() {        @Override
        public void onCompleted() {

        }        @Override
        public void onError(Throwable e) {
            Log.i(TAG, e.toString());
        }        @Override
        public void onNext(Drawable drawable) {            // 接收到Drawable对象,显示在ImageView上
            ivLogo.setImageDrawable(drawable);
        }
    });

上面示例是RxJava*基本的一个用法。稍微消化一下,继续~~

RxJava 进阶

Scheduler线程控制

默认情况下,RxJava事件产生和消费均在同一个线程中,例如在主线程中调用,那么事件的产生和消费都在主线程。

那么问题来了,假如事件产生的过程是耗时操作,比如网络请求,结果显示在UI中,这个时候在主线程执行对于网络请求就不合适了,而在子线程执行,显示结果需要进行UI操作,同样不合适~~

所以,RxJava 的*个牛逼之处在于可以自由切换线程!那么,如何做?

在 RxJava 中,提供了一个名为 Scheduler 的线程调度器,RxJava 内部提供了4个调度器,分别是:

  1. Schedulers.io(): I/O 操作(读写文件、数据库、网络请求等),与newThread()差不多,区别在于io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 效率比 newThread() 更高。值得注意的是,在 io() 下,不要进行大量的计算,以免产生不必要的线程;
  2. Schedulers.newThread(): 开启新线程操作;
  3. Schedulers.immediate(): 默认指定的线程,也就是当前线程;
  4. Schedulers.computation():计算所使用的调度器。这个计算指的是 CPU 密集型计算,即不会被 I/O等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。值得注意的是,不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU;
  5. AndroidSchedulers.mainThread(): RxJava 扩展的 Android 主线程;

我们可以通过 subscribeOn() 和 observeOn() 这两个方法来进行线程调度。举个栗子:

依然还是显示一张图片,不同的是,这次是从网络上加载图片

final ImageView ivLogo = (ImageView) findViewById(R.id.ivLogo);

Observable.create(new Observable.OnSubscribe<Drawable>() {    @Override
    public void call(Subscriber<? super Drawable> subscriber) {        try {
            Drawable drawable = Drawable.createFromStream(new URL("https://ss2.baidu.com/6ONYsjip0QIZ8tyhnq/it/u=2502144641,437990411&fm=80&w=179&h=119&img.JPEG").openStream(), "src");
            subscriber.onNext(drawable);
        } catch (IOException e) {
            subscriber.onError(e);
        }
    }
})        // 指定 subscribe() 所在的线程,也就是上面call()方法调用的线程
        .subscribeOn(Schedulers.io())        // 指定 Subscriber 回调方法所在的线程,也就是onCompleted, onError, onNext回调的线程
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Drawable>() {            @Override
            public void onCompleted() {

            }            @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.toString());
            }            @Override
            public void onNext(Drawable drawable) {
                ivLogo.setImageDrawable(drawable);
            }
        });

所以,这段代码就做一件事,在 io 线程加载一张网络图片,加载完毕之后在主线程中显示到ImageView上。

变换

RxJava的又一牛逼之处,在于 变换。啥意思呢? 就是将发送的事件或事件序列,加工后转换成不同的事件或事件序列。

map操作符

变换的概念不好理解吧?举个简单的栗子,我们对上述示例 进行改写。

final ImageView ivLogo = (ImageView) findViewById(R.id.ivLogo);

Observable.create(new Observable.OnSubscribe<String>() {    @Override
    public void call(Subscriber<? super String> subscriber) {

        subscriber.onNext("https://ss2.baidu.com/-vo3dSag_xI4khGko9WTAnF6hhy/image/h%3D200/sign=4db5130a073b5bb5a1d727fe06d2d523/cf1b9d16fdfaaf51965f931e885494eef11f7ad6.jpg");
    }
}).map(new Func1<String, Drawable>() {    @Override
    public Drawable call(String url) {        try {
            Drawable drawable = Drawable.createFromStream(new URL(url).openStream(), "src");            return drawable;
        } catch (IOException e) {

        }        return null;
    }
})        // 指定 subscribe() 所在的线程,也就是call()方法调用的线程
        .subscribeOn(Schedulers.io())        // 指定 Subscriber 回调方法所在的线程,也就是onCompleted, onError, onNext回调的线程
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Drawable>() {            @Override
            public void onCompleted() {

            }            @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.toString());
            }            @Override
            public void onNext(Drawable drawable) {                if (drawable != null) {
                    ivLogo.setImageDrawable(drawable);
                }
            }
        });

经过改写代码后,有什么变化呢? Observable 创建了一个 String 事件,也就是产生一个url,通过 map 操作符进行变换,返回Drawable对象,这个变换指的就是通过url进行网络图片请求,返回一个Drawable。所以简单的来说就是把String事件,转换为Drawable事件。逻辑表示就是:

Observable<String> --> map变换 --> Observable<Drawable>

那么,Func1 是什么呢?与 Action1 类似,不同的是 FuncX 有返回值,而 ActionX 没有。为什么需要返回值呢?目的就在于对象的变换,由String对象转换为Drawable对象。同样,也有Func0 ~ Func9,对应不同的参数个数。

当然了,RxJava 的变换,可不止于map这么简单,继续往下!

flatMap操作符

不难发现,上述的 map 操作符,是一对一的变换,并且返回的是变换后的对象。而 flatMap 操作符可以适应一对多,并且返回的是一个 Observable 。应用场景举例:例如一个员工负责多个任务,现在要打印所有员工的所有任务。

final List<Employee> list = new ArrayList<Employee>() {
    {
        add(new Employee("jackson", mission_list1));
        add(new Employee("sunny", mission_list2));
    }
};
Observable.from(list)
        .flatMap(new Func1<Employee, Observable<Employee.Mission>>() {            @Override
            public Observable<Employee.Mission> call(Employee employee) {                return Observable.from(employee.missions);
            }
        })
        .subscribe(new Subscriber<Employee.Mission>() {            @Override
            public void onCompleted() {

            }            @Override
            public void onError(Throwable e) {

            }            @Override
            public void onNext(Employee.Mission mission) {
                Log.i(TAG, mission.desc);
            }
        });

执行结果为顺序打印出两位员工的所有任务列表。

通过上面的代码可以看出,map 与 flatMap 这两个操作符的共同点在于,他们都是把一个对象转换为另一个对象,但须注意以下这些特点:

  1. flatMap 返回的是一个Observable对象,而 map 返回的是一个普通转换后的对象;
  2. flatMap 返回的Observable对象并不是直接发送到Subscriber的回调中,而是重新创建一个Observable对象,并激活这个Observable对象,使之开始发送事件;而 map 变换后返回的对象直接发到Subscriber回调中;
  3. flatMap 变换后产生的每一个Observable对象发送的事件,*后都汇入同一个Observable,进而发送给Subscriber回调;
  4. map返回类型 与 flatMap 返回的Observable事件类型,可以与原来的事件类型一样;
  5. 可以对一个Observable多次使用 map 和 flatMap

鉴于 flatMap 自身强大的功能,这常常被用于 嵌套的异步操作,例如嵌套网络请求。传统的嵌套请求,一般都是在前一个请求的 onSuccess() 回调里面发起新的请求,这样一旦嵌套多个的话,缩进就是大问题了,而且严重的影响代码的可读性。而RxJava嵌套网络请求仍然通过链式结构,保持代码逻辑的清晰!举个栗子:

Github上的 README.md 文件,通常是 MarkDown 语法。我们要获取 README.md 内容并按 MarkDown 风格显示在UI上,就可以通过以下方式(Retrofit2 + RxJava,稍后会介绍):

new ReadmeContentClient()    // 获取md语法的Readme内容, 返回的是一个Observable<String>对象
    .getReadme()
    .flatMap(new Func1<String, Observable<String>>() {        @Override
        public Observable<String> call(String md) {            // 由于Readme的内容是md语法,需要转成html字符串通过WebView显示到UI
            // 返回的也是Observable<String>对象
            return new MarkDownStyleClient(md)
                            .formatMarkStyle();
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<String>() {        @Override
        public void onCompleted() {

        }        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "readme:" + e.toString());
        }        @Override
        public void onNext(String html) {            // html就是根据readme md格式内容,生成的html代码
            view.showReadme(html);
        }
    });

RxJava 其他常用操作符

  1. from
    接收一个集合作为输入,然后每次输出一个元素给subscriber。

    // Observable.from(T[] params)Observable.from(new Integer[]{1, 2, 3, 4, 5})
        .subscribe(new Action1<Integer>() {        @Override
            public void call(Integer number) {
                Log.i(TAG, "number:" + number);
            }
        });

    注意:如果from()里面执行了耗时操作,即使使用了subscribeOn(Schedulers.io()),仍然是在主线程执行,可能会造成界面卡顿甚至崩溃,所以耗时操作还是使用Observable.create(…);

  2. just
    接收一个可变参数作为输入,*终也是生成数组,调用from(),然后每次输出一个元素给subscriber。

    // Observable.just(T... params),params的个数为1 ~ 10Observable.just(1, 2, 3, 4, 5)
        .subscribe(new Action1<Integer>() {        @Override
            public void call(Integer number) {
                Log.i(TAG, "number:" + number);
            }
        });
  3. filter
    条件过滤,去除不符合某些条件的事件。举个栗子:

    Observable.from(new Integer[]{1, 2, 3, 4, 5})
        .filter(new Func1<Integer, Boolean>() {        @Override
            public Boolean call(Integer number) {            // 偶数返回true,则表示剔除奇数,留下偶数
                return number % 2 == 0;
            }
        })
        .subscribe(new Action1<Integer>() {        @Override
            public void call(Integer number) {
                Log.i(TAG, "number:" + number);
            }
        });
  4. take
    *多保留的事件数。
  5. doOnNext
    在处理下一个事件之前要做的事。

    Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12})
        .filter(new Func1<Integer, Boolean>() {        @Override
            public Boolean call(Integer number) {            // 偶数返回true,则表示剔除奇数
                return number % 2 == 0;
            }
        })    // *多保留三个,也就是*后剩三个偶数
        .take(3)
        .doOnNext(new Action1<Integer>() {        @Override
            public void call(Integer number) {            // 在输出偶数之前输出它的hashCode
                Log.i(TAG, "hahcode = " + number.hashCode() + "");
            }
        })
        .subscribe(new Action1<Integer>() {        @Override
            public void call(Integer number) {
                Log.i(TAG, "number = " + number);
            }
        });

    输出如下:

    hahcode = 2number = 2hahcode = 4number = 4hahcode = 6number = 6
  6. debounce
    通俗点讲,就是N个事件发生的时间间隔太近,就过滤掉前N-1个事件,保留*后一个事件。debounce可以指定这个时间间隔!可以用在SearchEditText请求关键词的地方,SearchEditText的内容变化太快,可以抵制频繁请求关键词,后面第15条15.Subject会介绍这个。为了演示效果,先举个简单栗子:

    Observable
        .create(new Observable.OnSubscribe<Integer>() {        @Override
            public void call(Subscriber<? super Integer> subscriber) {            int i = 0;            int[] times = new int[]{100, 1000};            while (true) {
                    i++;                if (i >= 100)                    break;
                    subscriber.onNext(i);                try {                    // 注意!!!!
                        // 当i为奇数时,休眠1000ms,然后才发送i+1,这时i不会被过滤掉
                        // 当i为偶数时,只休眠100ms,便发送i+1,这时i会被过滤掉
                        Thread.sleep(times[i % 2]);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                subscriber.onCompleted();
            }
        })    // 间隔400ms以内的事件将被丢弃
        .debounce(400, TimeUnit.MILLISECONDS)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Integer>() {        @Override
            public void onCompleted() {
                Log.i(TAG, "complete");
            }        @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.toString());
            }        @Override
            public void onNext(Integer integer) {
                Log.i(TAG, "integer = " + integer);
            }
        });

    输出结果:

    11-23 10:44:45.167 MainActivity: integer = 111-23 10:44:46.270 MainActivity: integer = 311-23 10:44:47.373 MainActivity: integer = 511-23 10:44:48.470 MainActivity: integer = 711-23 10:44:49.570 MainActivity: integer = 911-23 10:44:50.671 MainActivity: integer = 1111-23 10:44:51.772 MainActivity: integer = 1311-23 10:44:52.872 MainActivity: integer = 1511-23 10:44:53.973 MainActivity: integer = 17...

    我们设置过滤条件为400ms,可以发现,奇数正常输出,因为在它的下一个事件事件隔了1000ms,所以它不会被过滤掉;偶数被过滤掉,是因为它距离下一个事件(奇数)只隔了100ms。并且,输出的两个事件相隔大约为 100ms + 1000ms = 1100ms

  7. merge
    用于合并两个Observable为一个Observable。较为简单。

    Observable.merge(Observable1, Observable2)
        .subscribe(subscriber);
  8. concat
    顺序执行多个Observable,个数为1 ~ 9。例子稍后与first操作符一起~~
  9. compose
    与 flatMap 类似,都是进行变换,返回Observable对象,激活并发送事件。

    1. compose 是唯一一个能够从数据流中得到原始Observable的操作符,所以,那些需要对整个数据流产生作用的操作(比如,subscribeOn()和observeOn())需要使用 compose 来实现。相较而言,如果在flatMap()中使用subscribeOn()或者observeOn(),那么它仅仅对在 flatMap 中创建的Observable起作用,而不会对剩下的流产生影响。这样就可以简化subscribeOn()以及observeOn()的调用次数了。
    2. compose 是对 Observable 整体的变换,换句话说, flatMap 转换Observable里的每一个事件,而 compose 转换的是整个Observable数据流。
    3. flatMap 每发送一个事件都创建一个 Observable,所以效率较低。而 compose 操作符只在主干数据流上执行操作。
    4. 建议使用 compose 代替 flatMap
  10. first
    只发送符合条件的*个事件。可以与前面的contact操作符,做网络缓存。举个栗子:依次检查Disk与Network,如果Disk存在缓存,则不做网络请求,否则进行网络请求。

    // 从缓存获取Observable<BookList> fromDisk = Observable.create(new Observable.OnSubscribe<BookList>() {    @Override
        public void call(Subscriber<? super BookList> subscriber) {
            BookList list = getFromDisk();        if (list != null) {
                subscriber.onNext(list);
            } else {
                subscriber.onCompleted();
            }
        }
    });// 从网络获取Observable<BookList> fromNetWork = bookApi.getBookDetailDisscussionList();
    
    Observable.concat(fromDisk, fromNetWork)        // 如果缓存不为null,则不再进行网络请求。反之
            .first()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<BookList>() {            @Override
                public void onCompleted() {
    
                }            @Override
                public void onError(Throwable e) {
    
                }            @Override
                public void onNext(BookList discussionList) {
    
                }
            });

    网络缓存用法,具体可参见我的项目:https://github.com/JustWayward/BookReader

  11. timer
    可以做定时操作,换句话讲,就是延迟执行。事件间隔由timer控制。举个栗子:两秒后输出“Hello World!”

    Observable.timer(2, TimeUnit.SECONDS)
        .subscribe(new Subscriber<Long>() {        @Override
            public void onCompleted() {
    
            }        @Override
            public void onError(Throwable e) {
    
            }        @Override
            public void onNext(Long aLong) {
                Log.i(TAG, "Hello World!");
            }
        });
  12. interval
    定时的周期性操作,与timer的区别就在于它可以重复操作。事件间隔由interval控制。举个栗子:每隔两秒输出“Hello World!”

    Observable.interval(2, TimeUnit.SECONDS)
        .subscribe(new Subscriber<Long>() {        @Override
            public void onCompleted() {
    
            }        @Override
            public void onError(Throwable e) {
    
            }        @Override
            public void onNext(Long aLong) {
                Log.i(TAG, "Hello World!");
            }
        });
  13. throttleFirst
    与debounce类似,也是时间间隔太短,就丢弃事件。可以用于防抖操作,比如防止双击。

    RxView.clicks(button)
      .throttleFirst(1, TimeUnit.SECONDS)
      .subscribe(new Observer<Object>() {      @Override
          public void onCompleted() {
    
          }      @Override
          public void onError(Throwable e) {
    
          }      @Override
          public void onNext(Object o) {
               Log.i(TAG, "do clicked!");
          }
      });

    上面这个RxView详见:https://github.com/JakeWharton/RxBinding, 主要与RxJava结合用于一些View的事件绑定,JakeWharton大神的项目,厉害。

  14. Single
    Single与Observable类似,相当于是他的精简版。订阅者回调的不是OnNext/OnError/onCompleted,而是回调OnSuccess/OnError。

    Single.create(new Single.OnSubscribe<Object>() {    @Override
        public void call(SingleSubscriber<? super Object> subscriber) {
            subscriber.onSuccess("Hello");
        }
    }).subscribe(new SingleSubscriber<Object>() {    @Override
        public void onSuccess(Object value) {
            Log.i(TAG, value.toString());
        }    @Override
        public void onError(Throwable error) {
    
        }
    });
  15. Subject
    Subject这个类,既是Observable又是Observer,啥意思呢?就是它自身既是事件的生产者,又是事件的消费者,相当于自身是一条管道,从一端进,又从另一端出。举个栗子:PublishSubject

    Subject subject = PublishSubject.create();// 1.由于Subject是Observable,所以进行订阅subject.subscribe(new Subscriber<Object>() {    @Override
        public void onCompleted() {
    
        }    @Override
        public void onError(Throwable e) {
    
        }    @Override
        public void onNext(Object o) {
            Log.i(TAG, o.toString());
        }
    });// 2.由于Subject同时也是Observer,所以可以调用onNext发送数据subject.onNext("world");

    这个好像有点厉害的样子,哈哈。可以配合debounce,避免SearchEditText频繁请求。

    Subject subject = PublishSubject.create();
    
    subject.debounce(400, TimeUnit.MILLISECONDS)
            .subscribe(new Subscriber<Object>() {        @Override
            public void onCompleted() {
    
            }        @Override
            public void onError(Throwable e) {
    
            }        @Override
            public void onNext(Object o) {            // request
            }
        });
    
    edittext.addTextChangedListener(new TextWatcher() {    @Override 
        public void beforeTextChanged(CharSequence s, int start, int count, int after) { }    @Override 
        public void onTextChanged(CharSequence s, int start, int before, int count) {
            subject.onNext(s.toString());
        }    @Override 
        public void afterTextChanged(Editable s) { } 
    });

RxJava 应用

RxJava+Retrofit 的网络请求方式

Retrofit是一个非常适合RestAPI的网络请求库。没用过的童鞋,还是推荐学一学的。

使用Callback的请求方式:

// 1. 定义一个请求接口@GET("/match/stat")
Call<String> getMatchStat(@Query("mid") String mid, @Query("tabType") String tabType);// 2. 创建Service对象Retrofit retrofit = new Retrofit.Builder()
                        .baseUrl(BuildConfig.TENCENT_SERVER)// 加入RxJava支持
                        .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
                        .addConverterFactory(ScalarsConverterFactory.create())
                        .client(OkHttpHelper.getTecentClient()).build();

TencentApi api = retrofit.create(TencentApi.class);// 3. 调用Call<String> call = api.getMatchStat(mid, tabType);
call.enqueue(new Callback<String>() {    @Override
    public void onResponse(Call<String> call, Response<String> response) {        if(response != null && response.body()!=null)            // 成功
        } else {            // 无数据
        }
    }    @Override
    public void onFailure(Call<String> call, Throwable t) {        // 失败
    }
});

与 RxJava 结合的方式,则是

// 1. 定义请求接口,返回的是Observable对象@GET("/user/followers")
Observable<List<User>> followers();// 2. 同样是创建api对象...// 3. 请求api.followers()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<List<User>>() {        @Override
        public void onCompleted() {

        }        @Override
        public void onError(Throwable e) {            // 请求出错。可能发生网络异常、Json解析异常等等
        }        @Override
        public void onNext(List<User> list) {            // 请求成功
            view.showMyFollowers(list);
        }
    });

若需嵌套请求,比如先获取Token再进行才能进行登录,可参考flatMap操作符*后的获取Readme内容显示在WebView上的例子。

Retrofit2 + RxJava + Dagger2: 具体可参见我的项目,里面有比较详细的用法。
https://github.com/JustWayward/BookReader

不难发现,Retrofit 把请求封装进 Observable ,在请求结束后调用 onNext() 以及 OnCompleted() 或在请求失败后调用 onError()

:RxJava形式的请求,并不能减少代码量,但是逻辑非常清晰。假如请求到数据之后需要对数据进行处理,并且是耗时操作,难道要再开一个线程,或者用AsyncTask再做一次异步?很显然,RxJava的变换很好的解决了这个问题,依然会使逻辑结构清晰。

RxBus

准确的来说,是一种基于RxJava实现事件总线的一种思想。可以替代EventBus/Otto,因为他们都依赖于观察者模式。可以参考https://github.com/AndroidKnife/RxBus这个库。

RxBinding

前面介绍过了,JakeWharton大神的项目,https://github.com/JakeWharton/RxBinding, 主要与RxJava结合用于一些View的事件绑定。

RxJava 的一些坑

未取消订阅而引起的内存泄漏

举个栗子,对于前面常用操作符12.interval做周期性操作的例子,并没有使之停下来的,没有去控制订阅的生命周期,这样,就有可能引发内存泄漏。所以,在Activity#onDestroy()的时候或者不需要继续执行的时候应该取消订阅。

Subscription subscription = Observable.interval(2, TimeUnit.SECONDS)
    .subscribe(new Subscriber<Long>() {        @Override
        public void onCompleted() {

        }        @Override
        public void onError(Throwable e) {

        }        @Override
        public void onNext(Long aLong) {
            Log.i(TAG, "Hello World!");
        }
    });// 调用unsubscribe();方法进行取消订阅subscription.unsubscribe();

但是,如果有很多个数据源,那岂不是要取消很多次?当然不是的,可以利用 CompositeSubscription, 相当于一个 Subscription 集合。

CompositeSubscription list = new CompositeSubscription();
list.add(subscription1);
list.add(subscription2);
list.add(subscription3);// 统一调用一次unsubscribe,就可以把所有的订阅都取消list.unsubscribe();

总结

相信到了这里,大家对RxJava应该有了一个比较清晰的理解。当然,实践出真知,还是要去尝试,才能更深层次的体会到其强大之处。

*后,总结一下RxJava的基本使用过程。

  1. 首先是创建事件源源,也就是被观察者,可以用Observable的create/just/from等方法来创建;
  2. 通过filter/debounce等操作符,进行自定义事件过滤;
  3. 通过Schedules进行事件发送和订阅的线程控制,也就是subscribeOn() 和 observeOn();
  4. 通过map/flatMap/compose等操作符,进行事件的变换;
  5. 调用subscribe进行事件订阅;
  6. *后,不要忘了对订阅者生命周期的控制,不用的时候,记得调用unsubscribe(),以免引发内存泄漏。