iOS 代码格式化管理

虽然在项目创建和团队组建的初期,我们就把公共约定以及一些规范定下来了,并且由于我们的代码是通过Git来做版本控制的,web上直接就支持Markdown格式的readme文件,可以随时看到*新的版本,但是这种规范只能依靠个人的意识,或者通过代码Review来解决,而且做代码Review的时候,你也不好意思总是写上一堆诸如“这里要加个空格”、“那里要加上换行”的评论吧?如果不管,久而久之,会因为每个人的习惯不同,代码呈现出多种风格,看起来也不像一个成熟团队做出来的产品。

为了弥补Xcode代码格式化的短板,我们选择了引入一个第三方的插件:CLangFormat。

具体流程:

1. 先安装Package Manager(也可以跳过,看第2步)
官网地址:https://github.com/supermarin/Alcatraz

安装方法:在终端输入:curl -fsSL https://raw.github.com/supermarin/Alcatraz/master/Scripts/install.sh | sh

安装成功后在Xcode的Window里能看到“Package Manager”

2. 安装CLangFormat
GitHub地址:https://github.com/travisjeffery/ClangFormat-Xcode

安装方法:直接在Package Manager里搜索并安装,如果不想安装Package Manager的话,就直接把上面那个GitHub中的代码Clone下来,在Xcode中编译、运行,然后重启Xcode即可。

3. 配置CLangFormat
虽然CLangFormat本身就内置了一些标准化的代码格式化方案,但是同样可以自定义,我们就采用了自定义的方法。

具体的,在工程目录或者workspace目录下创建一个”.clang-format”文件,添加类似于以下内容的参数:

# 基础样式
BasedOnStyle: LLVM

# 缩进宽度
IndentWidth: 4

# 圆括号的换行方式
BreakBeforeBraces: Attach

# 支持一行的if
AllowShortIfStatementsOnASingleLine: true

# switch的case缩进
IndentCaseLabels: true

# 针对OC的block的缩进宽度
ObjCBlockIndentWidth: 4

# 针对OC,属性名后加空格
ObjCSpaceAfterProperty: true

# 每行字符的长度
ColumnLimit: 0

# 注释对齐
AlignTrailingComments: true

# 括号后加空格
SpaceAfterCStyleCast: true
然后在Xcode的“Edit”->“CLang Format”中选中“File”,并让倒数第二行显示“Disable Format On Save”。

后面这个看实际情况,需不需要在文件随时保存的时候格式化,如果喜欢用快捷键的话,在“系统偏好设置”里能对所有的Menu选项设置快捷键,设置一个“Format File in Focus”的快捷键也很好用。

附上CLangFormat的所有可用参数文档:http://clang.llvm.org/docs/ClangFormatStyleOptions.html

附上我目前所使用的格式:

# 基础样式
BasedOnStyle: LLVM

# 缩进宽度
IndentWidth: 4

# 圆括号的换行方式
BreakBeforeBraces: Attach

# 支持一行的if
AllowShortIfStatementsOnASingleLine: true

# switch的case缩进
IndentCaseLabels: true

# 针对OC的block的缩进宽度
ObjCBlockIndentWidth: 4

# 针对OC,属性名后加空格
ObjCSpaceAfterProperty: true

# 每行字符的长度
ColumnLimit: 0

# 注释对齐
AlignTrailingComments: true

# 括号后加空格
SpaceAfterCStyleCast: true

# 不在小括号里加空格
SpacesInParentheses: false

# 不在中括号里加空格
SpacesInSquareBrackets: false

 

RxJava系列(组合操作符)

这一章我们接着介绍组合操作符,这类operators可以同时处理多个Observable来创建我们所需要的Observable。组合操作符主要包含: Merge StartWith Concat Zip CombineLatest SwitchOnNext Join等等。

Merge

merge(Observable, Observable)将两个Observable发射的事件序列组合并成一个事件序列,就像是一个Observable发射的一样。你可以简单的将它理解为两个Obsrvable合并成了一个Observable,合并后的数据是无序的。

%title插图%num
merge(Observable, Observable)

我们看下面的例子,一共有两个Observable:一个用来发送字母,另一个用来发送数字;现在我们需要两连个Observable发射的数据合并。

String[] letters = new String[]{"A", "B", "C", "D", "E", "F", "G", "H"};
Observable<String> letterSequence = Observable.interval(300, TimeUnit.MILLISECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long position) {
                return letters[position.intValue()];
            }
        }).take(letters.length);

Observable<Long> numberSequence = Observable.interval(500, TimeUnit.MILLISECONDS).take(5);

Observable.merge(letterSequence, numberSequence)
        .subscribe(new Observer<Serializable>() {
            @Override
            public void onCompleted() {
                System.exit(0);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("Error:" + e.getMessage());
            }

            @Override
            public void onNext(Serializable serializable) {
                System.out.print(serializable.toString()+" ");
            }
        });

程序输出:

A 0 B C 1 D E 2 F 3 G H 4 

merge(Observable[])将多个Observable发射的事件序列组合并成一个事件序列,就像是一个Observable发射的一样。

%title插图%num
merge(Observable[])

StartWith

startWith(T)用于在源Observable发射的数据前插入数据。使用startWith(Iterable<T>)我们还可以在源Observable发射的数据前插入Iterable。官方示意图:

%title插图%num
startWith(T) startWith(Iterable<T>)

startWith(Observable<T>)用于在源Observable发射的数据前插入另一个Observable发射的数据(这些数据会被插入到
源Observable发射数据的前面)。官方示意图:

%title插图%num
startWith(Observable<T>)

Concat

concat(Observable<? extends T>, Observable<? extends T>) concat(Observable<? extends Observable<T>>)用于将多个obserbavle发射的的数据进行合并发射,concat严格按照顺序发射数据,前一个Observable没发射玩是不会发射后一个Observable的数据的。它和merge、startWitch和相似,不同之处在于:

  1. merge:合并后发射的数据是无序的;
  2. startWitch:只能在源Observable发射的数据前插入数据。
    %title插图%num
    concat(Observable<? extends T>, Observable<? extends T>)、concat(Observable<? extends Observable<T>>)

这里我们将前面Merge操作符的例子拿过来,并将操作符换成Concat,然后我们看看执行结果:

String[] letters = new String[]{"A", "B", "C", "D", "E", "F", "G", "H"};
Observable<String> letterSequence = Observable.interval(300, TimeUnit.MILLISECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long position) {
                return letters[position.intValue()];
            }
        }).take(letters.length);

Observable<Long> numberSequence = Observable.interval(500, TimeUnit.MILLISECONDS).take(5);

Observable.concat(letterSequence, numberSequence)
        .subscribe(new Observer<Serializable>() {
            @Override
            public void onCompleted() {
                System.exit(0);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("Error:" + e.getMessage());
            }

            @Override
            public void onNext(Serializable serializable) {
                System.out.print(serializable.toString() + " ");
            }
        });

程序输出:

A B C D E F G H 0 1 2 3 4 

Zip

zip(Observable, Observable, Func2)用来合并两个Observable发射的数据项,根据Func2函数生成一个新的值并发射出去。当其中一个Observable发送数据结束或者出现异常后,另一个Observable也将停在发射数据。

%title插图%num
zip(Observable, Observable, Func2)

和前面的例子一样,我们将操作符换成了zip:

String[] letters = new String[]{"A", "B", "C", "D", "E", "F", "G", "H"};
Observable<String> letterSequence = Observable.interval(120, TimeUnit.MILLISECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long position) {
                return letters[position.intValue()];
            }
        }).take(letters.length);

Observable<Long> numberSequence = Observable.interval(200, TimeUnit.MILLISECONDS).take(5);

Observable.zip(letterSequence, numberSequence, new Func2<String, Long, String>() {
    @Override
    public String call(String letter, Long number) {
        return letter + number;
    }
}).subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.exit(0);
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Error:" + e.getMessage());
    }

    @Override
    public void onNext(String result) {
        System.out.print(result + " ");
    }
});

程序输出:

A0 B1 C2 D3 E4

CombineLatest

comnineLatest(Observable, Observable, Func2)用于将两个Observale*近发射的数据已经Func2函数的规则进展组合。下面是官方提供的原理图:

%title插图%num
comnineLatest(Observable, Observable, Func2)

下面这张图应该更容易理解:

%title插图%num
comnineLatest(Observable, Observable, Func2)
List<String> communityNames = DataSimulator.getCommunityNames();
List<Location> locations = DataSimulator.getLocations();

Observable<String> communityNameSequence = Observable.interval(1, TimeUnit.SECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long position) {
                return communityNames.get(position.intValue());
            }
        }).take(communityNames.size());
Observable<Location> locationSequence = Observable.interval(1, TimeUnit.SECONDS)
        .map(new Func1<Long, Location>() {
            @Override
            public Location call(Long position) {
                return locations.get(position.intValue());
            }
        }).take(locations.size());

Observable.combineLatest(
        communityNameSequence,
        locationSequence,
        new Func2<String, Location, String>() {
            @Override
            public String call(String communityName, Location location) {
                return "小区名:" + communityName + ", 经纬度:" + location.toString();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                System.exit(0);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("Error:" + e.getMessage());
            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }
        });

程序输出:

小区名:竹园新村, 经纬度:(21.827, 23.323)
小区名:康桥半岛, 经纬度:(21.827, 23.323)
小区名:康桥半岛, 经纬度:(11.923, 16.309)
小区名:中粮·海景壹号, 经纬度:(11.923, 16.309)
小区名:中粮·海景壹号, 经纬度:(22.273, 53.623)
小区名:浦江名苑, 经纬度:(22.273, 53.623)
小区名:南辉小区, 经纬度:(22.273, 53.623)

SwitchOnNext

switchOnNext(Observable<? extends Observable<? extends T>>用来将一个发射多个小Observable的源Observable转化为一个Observable,然后发射这多个小Observable所发射的数据。如果一个小的Observable正在发射数据的时候,源Observable又发射出一个新的小Observable,则前一个Observable发射的数据会被抛弃,直接发射新
的小Observable所发射的数据。

结合下面的原理图大家应该很容易理解,我们可以看到下图中的黄色圆圈就被丢弃了。

%title插图%num
switchOnNext(Observable<? extends Observable<? extends T>>)

Join

join(Observable, Func1, Func1, Func2)我们先介绍下join操作符的4个参数:

  • Observable:源Observable需要组合的Observable,这里我们姑且称之为目标Observable;
  • Func1:接收从源Observable发射来的数据,并返回一个Observable,这个Observable的声明周期决定了源Obsrvable发射出来的数据的有效期;
  • Func1:接收目标Observable发射来的数据,并返回一个Observable,这个Observable的声明周期决定了目标Obsrvable发射出来的数据的有效期;
  • Func2:接收从源Observable和目标Observable发射出来的数据,并将这两个数据组合后返回。

所以Join操作符的语法结构大致是这样的:onservableA.join(observableB, 控制observableA发射数据有效期的函数, 控制observableB发射数据有效期的函数,两个observable发射数据的合并规则)

join操作符的效果类似于排列组合,把*个数据源A作为基座窗口,他根据自己的节奏不断发射数据元素,第二个数据源B,每发射一个数据,我们都把它和*个数据源A中已经发射的数据进行一对一匹配;举例来说,如果某一时刻B发射了一个数据“B”,此时A已经发射了0,1,2,3共四个数据,那么我们的合并操作就会把“B”依次与0,1,2,3配对,得到四组数据: [0, B] [1, B] [2, B] [3, B]

再看看下面的图是不是好理解了呢?!

%title插图%num
join(Observable, Func1, Func1, Func2)

读懂了上面的文字,我们再来写段代码加深理解。

final List<House> houses = DataSimulator.getHouses();//模拟的房源数据,用于测试

//用来每秒从houses总取出一套房源并发射出去
Observable<House> houseSequence =
        Observable.interval(1, TimeUnit.SECONDS)
                .map(new Func1<Long, House>() {
                    @Override
                    public House call(Long position) {
                        return houses.get(position.intValue());
                    }
                }).take(houses.size());//这里的take是为了防止houses.get(position.intValue())数组越界

//用来实现每秒发送一个新的Long型数据
Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);

houseSequence.join(tictoc,
        new Func1<House, Observable<Long>>() {
            @Override
            public Observable<Long> call(House house) {
                return Observable.timer(2, TimeUnit.SECONDS);
            }
        },
        new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                return Observable.timer(0, TimeUnit.SECONDS);
            }
        },
        new Func2<House, Long, String>() {
            @Override
            public String call(House house, Long aLong) {
                return aLong + "-->" + house.getDesc();
            }
        }
).subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.exit(0);
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Error:"+e.getMessage());
    }

    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

程序输出:

0-->中粮海景壹号新出大平层!总价4500W起
1-->中粮海景壹号新出大平层!总价4500W起
1-->满五唯一,黄金地段
2-->中粮海景壹号新出大平层!总价4500W起
2-->满五唯一,黄金地段
2-->一楼自带小花园
3-->一楼自带小花园
3-->毗邻汤臣一品
4-->毗邻汤臣一品
4-->顶级住宅,给您总统般尊贵体验
5-->顶级住宅,给您总统般尊贵体验
5-->顶层户型,两室一厅
6-->顶层户型,两室一厅
6-->南北通透,豪华五房
7-->南北通透,豪华五房

通过转换操作符、过滤操作符、组合操作符三个篇幅将RxJava主要的操作符也介绍的七七八八了。更多操作符的介绍建议大家去查阅官方文档,并自己动手实践一下。这一系列的文章也会持续更新,欢迎大家保持关注!:)

RxJava系列 从微观角度解读RxJava源码

前言

通过前面五个篇幅的介绍,相信大家对RxJava的基本使用以及操作符应该有了一定的认识。但是知其然还要知其所以然;所以从这一章开始我们聊聊源码,分析RxJava的实现原理。本文我们主要从三个方面来分析RxJava的实现:

  • RxJava基本流程分析
  • 操作符原理分析
  • 线程调度原理分析

本章节基于RxJava1.1.9版本的源码

一、RxJava执行流程分析

在RxJava系列2(基本概念及使用介绍)中我们介绍过,一个*基本的RxJava调用是这样的:

示例A

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello RxJava!");
        subscriber.onCompleted();
    }
}).subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {
        System.out.println("completed!");
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

首先调用Observable.create()创建一个被观察者Observable,同时创建一个OnSubscribe作为create()方法的入参;接着创建一个观察者Subscriber,然后通过subseribe()实现二者的订阅关系。这里涉及到三个关键对象和一个核心的方法:

  • Observable(被观察者)
  • OnSubscribe (从纯设计模式的角度来理解,OnSubscribe.call()可以看做是观察者模式中被观察者用来通知观察者的notifyObservers()方法)
  • Subscriber (观察者)
  • subscribe() (实现观察者与被观察者订阅关系的方法)

1、Observable.create()源码分析

首先我们来看看Observable.create()的实现:

public static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(RxJavaHooks.onCreate(f));
}

这里创建了一个被观察者Observable,同时将RxJavaHooks.onCreate(f)作为构造函数的参数,源码如下:

protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
}

我们看到源码中直接将参数RxJavaHooks.onCreate(f)赋值给了当前我们构造的被观察者Observable的成员变量onSubscribe。那么RxJavaHooks.onCreate(f)返回的又是什么呢?我们接着往下看:

public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
    Func1<OnSubscribe, OnSubscribe> f = onObservableCreate;
    if (f != null) {
        return f.call(onSubscribe);
    }
    return onSubscribe;
}

由于我们并没调用RxJavaHooks.initCreate(),所以上面代码中的onObservableCreate为null;因此RxJavaHooks.onCreate(f)*终返回的就是f,也就是我们在Observable.create()的时候new出来的OnSubscribe。(由于对RxJavaHooks的理解并不影响我们对RxJava执行流程的分析,因此在这里我们不做进一步的探讨。为了方便理解我们只需要知道RxJavaHooks一系列方法的返回值就是入参本身就OK了,例如这里的RxJavaHooks.onCreate(f)返回的就是f)。

至此我们做下逻辑梳理:Observable.create()方法构造了一个被观察者Observable对象,同时将new出来的OnSubscribe赋值给了该Observable的成员变量onSubscribe

2、Subscriber源码分析

接着我们看下观察者Subscriber的源码,为了增加可读性,我去掉了源码中的注释和部分代码。

public abstract class Subscriber<T> implements Observer<T>, Subscription {
    
    private final SubscriptionList subscriptions;//订阅事件集,所有发送给当前Subscriber的事件都会保存在这里
    
    ...

    protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
        this.subscriber = subscriber;
        this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
    }

    ...

    @Override
    public final void unsubscribe() {
        subscriptions.unsubscribe();
    }

    @Override
    public final boolean isUnsubscribed() {
        return subscriptions.isUnsubscribed();
    }

    public void onStart() {
    }
    
    ...
}
public interface Subscription {
    void unsubscribe();
    boolean isUnsubscribed();
}

Subscriber实现了Subscription接口,从而对外提供isUnsubscribed()unsubscribe()方法。前者用于判断是否已经取消订阅;后者用于将订阅事件列表(也就是当前观察者的成员变量subscriptions)中的所有Subscription取消订阅,并且不再接受观察者Observable发送的后续事件。

3、subscribe()源码分析

前面我们分析了观察者和被观察者相关的源码,那么接下来便是整个订阅流程中**关键的环节了。

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {

    ...

    subscriber.onStart();
    
    if (!(subscriber instanceof SafeSubscriber)) {
        subscriber = new SafeSubscriber<T>(subscriber);
    }

    try {
        RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

        return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
        ...
        return Subscriptions.unsubscribed();
    }
}

subscribe()方法中将传进来的subscriber包装成了SafeSubscriberSafeSubscriber其实是subscriber的一个代理,对subscriber的一系列方法做了更加严格的安全校验。保证了onCompleted()onError()只会有一个被执行且只执行一次,一旦它们其中方法被执行过后onNext()就不在执行了。

上述代码中*关键的就是RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)。这里的RxJavaHooks和之前提到的一样,RxJavaHooks.onObservableStart(observable, observable.onSubscribe)返回的正是他的第二个入参observable.onSubscribe,也就是当前observable的成员变量onSubscribe。而这个成员变量我们前面提到过,它是我们在Observable.create()的时候new出来的。所以这段代码可以简化为onSubscribe.call(subscriber)。这也印证了我在RxJava系列2(基本概念及使用介绍)中说的,onSubscribe.call(subscriber)中的subscriber正是我们在subscribe()方法中new出来的观察者。

到这里,我们对RxJava的执行流程做个总结:首先我们调用crate()创建一个观察者,同时创建一个OnSubscribe作为该方法的入参;接着调用subscribe()来订阅我们自己创建的观察者Subscriber
一旦调用subscribe()方法后就会触发执行OnSubscribe.call()。然后我们就可以在call方法调用观察者subscriberonNext(),onCompleted(),onError()

*后我用张图来总结下之前的分析结果:

%title插图%num
RxJava基本流程分析

二、操作符原理分析

之前我们介绍过几十个操作符,要一一分析它们的源码显然不太现实。在这里我抛砖引玉,选取一个相对简单且常用的map操作符来分析。

我们先来看一个map操作符的简单应用:

示例B

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(1);
        subscriber.onCompleted();
    }
}).map(new Func1<Integer, String>() {
    @Override
    public String call(Integer integer) {
        return "This is " + integer;
    }
}).subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted!");
    }
    @Override
    public void onError(Throwable e) {
        System.out.println(e.getMessage());
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

为了便于表述,我将上面的代码做了如下拆解:

Observable<Integer> observableA = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(1);
        subscriber.onCompleted();
    }
});

Subscriber<String> subscriberOne = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted!");
    }
    @Override
    public void onError(Throwable e) {
        System.out.println(e.getMessage());
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
};

Observable<String> observableB = 
        observableA.map(new Func1<Integer, String>() {
                @Override
                public String call(Integer integer) {
                    return "This is " + integer;;
                }
            });

observableB.subscribe(subscriberOne);

map()的源码和上一小节介绍的create()一样位于Observable这个类中。

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return create(new OnSubscribeMap<T, R>(this, func));
}

通过查看源码我们发现调用map()的时候实际上是创建了一个新的被观察者Observable,我们姑且称它为ObservableB;一开始通过Observable.create()创建的Observable我们称之为ObservableA。在创建ObservableB的时候同时创建了一个OnSubscribeMap,而ObservableA和变换函数Func1则作为构造OnSubscribeMap的参数。

public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {

    final Observable<T> source;//ObservableA
    
    final Func1<? super T, ? extends R> transformer;//map操作符中的转换函数Func1。T为转换前的数据类型,在上面的例子中为Integer;R为转换后的数据类型,在该例中为String。

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }
    
    @Override
    public void call(final Subscriber<? super R> o) {//结合*小节的分析结果,我们知道这里的入参o其实就是我们自己new的观察者subscriberOne。
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }
    
    static final class MapSubscriber<T, R> extends Subscriber<T> {
        
        final Subscriber<? super R> actual;//这里的actual就是我们在调用subscribe()时创建的观察者mSubscriber
        final Func1<? super T, ? extends R> mapper;//变换函数
        boolean done;
        
        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }
        
        @Override
        public void onNext(T t) {
            R result;
            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }
            actual.onNext(result);
        }
        
        @Override
        public void onError(Throwable e) {
            ...
            actual.onError(e);
        }
        
        @Override
        public void onCompleted() {
            ...
            actual.onCompleted();
        }
        
        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }
}

OnSubscribeMap实现了OnSubscribe接口,因此OnSubscribeMap就是一个OnSubscribe。在调用map()的时候创建了一个新的被观察者ObservableB,然后我们用ObservableB.subscribe(subscriberOne)订阅了观察者subscriberOne。结合我们在*小节的分析结果,所以OnSubscribeMap.call(o)中的o就是subscribe(subscriberOne)中的subscriberOne;一旦调用了ObservableB.subscribe(subscriberOne)就会执行OnSubscribeMap.call()

call()方法中,首先通过我们的观察者o和转换函数transformer构造了一个MapSubscriber,*后调用了source也就是observableAunsafeSubscribe()方法。即observableA订阅了一个观察者MapSubscriber

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
    try {
        ...
        RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
        return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
        ...
        return Subscriptions.unsubscribed();
    }
}

上面这段代码*终执行了onSubscribe也就是OnSubscribeMapcall()方法,call()方法中的参数就是之前在OnSubscribeMap.call()中new出来的MapSubscriber。*后在call()方法中执行了我们自己的业务代码:

subscriber.onNext(1);
subscriber.onCompleted();

其实也就是执行了MapSubscriberonNext()onCompleted()

@Override
public void onNext(T t) {
    R result;
    try {
        result = mapper.call(t);
    } catch (Throwable ex) {
        ...
        return;
    }
    actual.onNext(result);
}

onNext(T t)方法中的的mapper就是变换函数,actual就是我们在调用subscribe()时创建的观察者subscriberOne。这个T就是我们例子中的IntegerR就是String。在onNext()中首先调用变换函数mapper.call()T转换成R(在我们的例子中就是将Integer类型的1转换成了String类型的“This is 1”);接着调用subscriberOne.onNext(String result)。同样在调用MapSubscriber.onCompleted()时会执行subscriberOne.onCompleted()。这样就完成了一直完成的调用流程。

我承认太啰嗦了,花费了这么大的篇幅才将map()的转换原理解释清楚。我也是希望尽量的将每个细节都呈现出来方便大家理解,如果看我啰嗦了这么久还是没能理解,请看下面我画的这张执行流程图。

%title插图%num
加入Map操作符后的执行流程

三、线程调度原理分析

在前面的文章中我介绍过RxJava可以很方便的通过subscribeOn()observeOn()来指定数据流的每一部分运行在哪个线程。其中subscribeOn()指定了处理Observable的全部的过程(包括发射数据和通知)的线程;observeOn()指定了观察者的onNext()onError()onCompleted()执行的线程。接下来我们就分析分析源码,看看线程调度是如何实现的。

在分析源码前我们先看看一段常见的通过RxJava实现的线程调度代码:

示例C

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello RxJava!");
        subscriber.onCompleted();
    }
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {
        System.out.println("completed!");
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

1、subscribeOn()源码分析

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ...
    return create(new OperatorSubscribeOn<T>(this, scheduler));
}

通过上面的代码我们可以看到,subscribeOn()map()一样是创建了一个新的被观察者Observable。因此我大致就能猜到subscribeOn()的执行流程应该和map()差不多,OperatorSubscribeOn肯定也是一个OnSubscribe。那我们接下来就看看OperatorSubscribeOn的源码:

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;//线程调度器,用来指定订阅事件发送、处理等所在的线程
    final Observable<T> source;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        
        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();
                
                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
                    
                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };
                source.unsafeSubscribe(s);
            }
        });
    }
}

OperatorSubscribeOn实现了OnSubscribe接口,call()中对Subscriber的处理也和OperatorMapSubscriber的处理类似。首先通过scheduler构建了一个Worker;然后用传进来的subscriber构造了一个新的Subscriber s,并将s丢到Worker.schedule()中来处理;*后用原Observable去订阅观察者s。而这个Worker就是线程调度的关键!前面的例子中我们通过subscribeOn(Schedulers.io())指定了Observable发射处理事件以及通知观察者的一系列操作的执行线程,正是通过这个Schedulers.io()创建了我们前面提到的Worker。所以我们来看看Schedulers.io()的实现。

首先通过Schedulers.io()获得了ioScheduler并返回,上面的OperatorSubscribeOn通过这个的SchedulercreateWorker()方法创建了我们前面提到的Worker

public static Scheduler io() {
    return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
}

接着我们看看这个ioScheduler是怎么来的,下面的代码向我们展现了是如何在Schedulers的构造函数中通过RxJavaSchedulersHook.createIoScheduler()来初始化ioScheduler的。

private Schedulers() {

    ...

    Scheduler io = hook.getIOScheduler();
    if (io != null) {
        ioScheduler = io;
    } else {
        ioScheduler = RxJavaSchedulersHook.createIoScheduler();
    }

    ...
}

*终RxJavaSchedulersHook.createIoScheduler()返回了一个CachedThreadScheduler,并赋值给了ioScheduler

public static Scheduler createIoScheduler() {
    return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
}
public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
    ...
    return new CachedThreadScheduler(threadFactory);
}

到这一步既然我们知道了ioScheduler就是一个CachedThreadScheduler,那我们就来看看它的createWorker()的实现。

public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

上面的代码向我们赤裸裸的呈现了前面OperatorSubscribeOn中的Worker其实就是EventLoopWorker。我们重点要关注的是他的scheduleActual()

static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
    private final CompositeSubscription innerSubscription = new CompositeSubscription();
    private final CachedWorkerPool pool;
    private final ThreadWorker threadWorker;
    final AtomicBoolean once;

    EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        this.once = new AtomicBoolean();
        this.threadWorker = pool.get();
    }

    ...

    @Override
    public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
        ...
        ScheduledAction s = threadWorker.scheduleActual(new Action0() {
            @Override
            public void call() {
                if (isUnsubscribed()) {
                    return;
                }
                action.call();
            }
        }, delayTime, unit);
        innerSubscription.add(s);
        s.addParent(innerSubscription);
        return s;
    }
}

通过对源码的一步步追踪,我们知道了前面OperatorSubscribeOn.call()中的inner.schedule()*终会执行到ThreadWorkerscheduleActual()方法。

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
    Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
    ScheduledAction run = new ScheduledAction(decoratedAction);
    Future<?> f;
    if (delayTime <= 0) {
        f = executor.submit(run);
    } else {
        f = executor.schedule(run, delayTime, unit);
    }
    run.add(f);
    return run;
}

scheduleActual()中的ScheduledAction实现了Runnable接口,通过线程池executor*终实现了线程切换。上面便是subscribeOn(Schedulers.io())实现线程切换的全部过程。

2、observeOn()源码分析

observeOn()切换线程是通过lift来实现的,相比subscribeOn()在实现原理上相对复杂些。不过本质上*终还是创建了一个新的Observable

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ...
    return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

OperatorObserveOn作为OnSubscribeLift构造函数的参数用来创建了一个新的OnSubscribeLift对象,接下来我们看看OnSubscribeLift的实现:

public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
    
    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            o.onError(e);
        }
    }
}

OnSubscribeLift继承自OnSubscribe,通过前面的分析我们知道一旦调用了subscribe()将观察者与被观察绑定后就会触发被观察者所对应的OnSubscribecall()方法,所以这里会触发OnSubscribeLift.call()。在call()中调用了OperatorObserveOn.call()并返回了一个新的观察者Subscriber st,接着调用了前一级Observable对应OnSubscriber.call(st)

我们再看看OperatorObserveOn.call()的实现:

public Subscriber<? super T> call(Subscriber<? super T> child) {
    ...
    ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
    parent.init();
    return parent;
}

OperatorObserveOn.call()中创建了一个ObserveOnSubscriber并调用init()进行了初始化。

static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {

    ...

    @Override
    public void onNext(final T t) {
        ...
        schedule();
    }

    @Override
    public void onCompleted() {
        ...
        schedule();
    }

    @Override
    public void onError(final Throwable e) {
        ...
        schedule();
    }

    protected void schedule() {
        if (counter.getAndIncrement() == 0) {
            recursiveScheduler.schedule(this);
        }
    }

    @Override
    public void call() {
        long missed = 1L;
        long currentEmission = emitted;

        final Queue<Object> q = this.queue;
        final Subscriber<? super T> localChild = this.child;
        final NotificationLite<T> localOn = this.on;
        
        for (;;) {
            long requestAmount = requested.get();
            
            while (requestAmount != currentEmission) {
                boolean done = finished;
                Object v = q.poll();
                boolean empty = v == null;
                
                if (checkTerminated(done, empty, localChild, q)) {
                    return;
                }
                
                if (empty) {
                    break;
                }
                
                localChild.onNext(localOn.getValue(v));

                currentEmission++;
                if (currentEmission == limit) {
                    requestAmount = BackpressureUtils.produced(requested, currentEmission);
                    request(currentEmission);
                    currentEmission = 0L;
                }
            }
            
            if (requestAmount == currentEmission) {
                if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                    return;
                }
            }

            emitted = currentEmission;
            missed = counter.addAndGet(-missed);
            if (missed == 0L) {
                break;
            }
        }
    }
    
    ...
}

ObserveOnSubscriber继承自Subscriber,并实现了Action0接口。我们看到ObserveOnSubscriberonNext()onCompleted()onError()都有个schedule(),这个方法就是我们线程调度的关键;通过schedule()将新观察者ObserveOnSubscriber发送给subscriberOne的所有事件都切换到了recursiveScheduler所对应的线程,简单的说就是把subscriberOneonNext()onCompleted()onError()方法丢到了recursiveScheduler对应的线程中来执行。

那么schedule()又是如何做到这一点的呢?他内部调用了recursiveScheduler.schedule(this)recursiveScheduler其实就是一个Worker,和我们在介绍subscribeOn()时提到的worker一样,执行schedule()实际上*终是创建了一个runable,然后把这个runnable丢到了特定的线程池中去执行。在runnablerun()方法中调用了ObserveOnSubscriber.call(),看上面的代码大家就会发现在call()方法中*终调用了subscriberOneonNext()onCompleted()onError()方法。这便是它实现线程切换的原理。

好了,我们*后再看看示例C对应的执行流程图,帮助大家加深理解。

%title插图%num
RxJava执行流程

总结

这一章以执行流程操作符实现以及线程调度三个方面为切入点剖析了RxJava源码。下一章将站在更宏观的角度来分析整个RxJava的框架结构、设计思想等等。敬请期待~~ 🙂

RxJava系列 *佳实践

前言

有点标题党了,其实谈不上什么*佳实践。前段时间公司实行996,所以也没什么时间和精力来更新博客(好吧我承认是我懒)。因此这篇文章只是简单的通过两个例子介绍了RxJava在生产环境中的使用。不过本篇中的每个例子我都配上了完整的代码。

按照计划这一期是要介绍RxJava框架结构和设计思想的,但是考虑到Netflix将在十月底发布RxJava2.0正式版;因此决定将RxJava框架结构和设计思想分析放到2.0正式版发布后再做。后续我也会有一系列的文章来介绍RxJava1.x和2.x的区别。

示例一、获取手机上已安装的App

*个例子我们需要在Android设备上展示已安装的第三方app列表,关于环境搭建、依赖配置、RecyclerView的使用等这些基础内容我就不做陈述了。需要了解的同学可以去GitHub上把项目clone下来看看。这里我主要讲讲如何通过RxJava实现核心功能。

首选我们需要调用系统api来获取所有已安装的app,所以在OnSubscribecall方法中调用getApplicationInfoList()。但是getApplicationInfoList()获取的数据并不能完全满足我们的业务需求:

  1. 由于我们只需要展示手机上已安装的第三方App,因此需要通过filter操作符来过滤掉系统app;
  2. ApplicationInfo并不是我们所需要的类型,因此需要通过map操作符将其转换为AppInfo
  3. 由于获取ApplicationInfo、过滤数据、转换数据相对比较耗时,因此需要通过subscribeOn操作符将这一系列操作放到子线程中来处理;
  4. 而要将信息展示在页面上涉及到UI操作,因此需要通过observeOn操作符将onNextonCompletedonError调度到主线程,接着我们在这些方法中更新UI。

下面是核心代码:

final PackageManager pm = MainActivity.this.getPackageManager();
Observable.create(new Observable.OnSubscribe<ApplicationInfo>() {
        @Override
        public void call(Subscriber<? super ApplicationInfo> subscriber) {
            List<ApplicationInfo> infoList = getApplicationInfoList(pm);
            for (ApplicationInfo info : infoList) {
                subscriber.onNext(info);
            }
            subscriber.onCompleted();
        }
    }).filter(new Func1<ApplicationInfo, Boolean>() {
        @Override
        public Boolean call(ApplicationInfo applicationInfo) {
            return (applicationInfo.flags & ApplicationInfo.FLAG_SYSTEM) <= 0;
        }
    }).map(new Func1<ApplicationInfo, AppInfo>() {

        @Override
        public AppInfo call(ApplicationInfo applicationInfo) {
            AppInfo info = new AppInfo();
            info.setAppIcon(applicationInfo.loadIcon(pm));
            info.setAppName(applicationInfo.loadLabel(pm).toString());
            return info;
        }
    }).subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<AppInfo>() {
        @Override
        public void onCompleted() {
            mAppListAdapter.notifyDataSetChanged();
            mPullDownSRL.setRefreshing(false);
        }

        @Override
        public void onError(Throwable e) {
            mPullDownSRL.setRefreshing(false);
        }

        @Override
        public void onNext(AppInfo appInfo) {
            mAppInfoList.add(appInfo);
        }
    });

程序执行效果图:

%title插图%num
效果图

完整的代码我放到了GitHub上,有兴趣大家可以去clone下来自己运行看看。

源码地址:https://github.com/BaronZ88/HelloRxAndroid

示例二、RxJava+Retrofit2实现获取天气数据

RxJava + Retrofit2几乎是Android应用开发的标配了,这个例子中我们就来聊聊这二者是如何配合起来帮助我们快速开发的。

Retrofit2中一个标准的接口定义是这样的:

@GET("weather")
Observable<Weather> getWeather(@Query("cityId") String cityId);

现在有了RxJava,一个基本的网络请求我们便可以这样实现:

ApiClient.weatherService.getWeather(cityId)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Weather>() {
                    @Override
                    public void call(Weather weather) {
                        weatherView.displayWeatherInformation(weather);
                    }
                });

但有时候可能一开始我们并不知道cityId,我们只知道cityName。所以就需要我们先访问服务器,拿到对应城市名的cityId,然后通过这个cityId再去获取天气数据。

同样的,我们需要定义一个获取cityId的接口:

@GET("city")
Observable<String> getCityIdByName(@Query("cityName") String cityName);

紧接着我们便可以使用无所不能的RxJava来实现需求了。

ApiClient.weatherService.getCityIdByName("上海")
             .flatMap(new Func1<String, Observable<Weather>>() {
                 @Override
                 public Observable<Weather> call(String cityId) {
                     return ApiClient.weatherService.getWeather(cityId);
                 }
             }).subscribeOn(Schedulers.io())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(new Action1<Weather>() {
                 @Override
                 public void call(Weather weather) {
                     weatherView.displayWeatherInformation(weather);
                 }
             });

哇哦!~ so easy!!!妈妈再也不用担心….

 

WeatherStyle这个项目还在开发中,这个项目不只包含了RxJava和Retrofit的使用,同时还包含MVP、ORMLite、RetroLambda、ButterKnife等等开源库的使用

RxJava1.X的系列文章就到此结束了,由于本人对RxJava的理解有限,这一系列文章中如有错误还请大家指正。在使用RxJava过程中有任何疑问也欢迎大家和我交流。共同学习!共同进步!

 

RxJava 和 RxAndroid (生命周期控制和内存优化)

RxJava使我们很方便的使用链式编程,代码看起来既简洁又优雅。但是RxJava使用起来也是有副作用的,使用越来越多的订阅,内存开销也会变得很大,稍不留神就会出现内存溢出的情况,这篇文章就是介绍Rxjava使用过程中应该注意的事项。

1、取消订阅 subscription.unsubscribe() ;

package lib.com.myapplication;
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action1;

public class MainActivity extends AppCompatActivity {

    Subscription subscription ;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        subscription =  Observable.just( "123").subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println( "tt--" + s );
            }
        }) ;
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        //取消订阅
        if ( subscription != null ){
            subscription.unsubscribe();
        }
    }
}

2、线程调度

  • Scheduler调度器,相当于线程控制器
    • Schedulers.immediate() : 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
    • Schedulers.newThread() :总是启用新线程,并在新线程执行操作.
    • Schedulers.io():I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
    • Schedulers.computation() : 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
    • 还有RxAndroid里面专门提供了AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
  • 常见的场景:为了不阻塞UI,在子线程加载数据,在主线线程显示数据
          Observable.just( "1" , "2" , "3" )
                .subscribeOn(Schedulers.io())  //指定 subscribe() 发生在 IO 线程
                .observeOn( AndroidSchedulers.mainThread() )  //指定 Subscriber 的回调发生在主线程
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        textView.setText( s );
                    }
                }) ;
    

    上面这段代码,数据”1″、”2″、”3″将在io线程中发出,在android主线程中接收数据。这种【后台获取数据,前台显示数据】模式适用于大多数的程序策略。

  • Scheduler 自由多次切换线程。恩,这个更为牛逼
    Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
     .subscribeOn(Schedulers.io())
     .observeOn(Schedulers.newThread())
     .map(mapOperator) // 新线程,由 observeOn() 指定
     .observeOn(Schedulers.io())
     .map(mapOperator2) // IO 线程,由 observeOn() 指定
     .observeOn(AndroidSchedulers.mainThread) 
     .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定
    

    从上面的代码可以看出

    • observeOn() 可以调用多次来切换线程,observeOn 决定他下面的方法执行时所在的线程。
    • subscribeOn() 用来确定数据发射所在的线程,位置放在哪里都可以,但它是只能调用一次的。

  • 上面介绍了两种控制Rxjava生命周期的方式,*种:取消订阅 ;第二种:线程切换 。这两种方式都能有效的解决android内存的使用问题,但是在实际的项目中会出现很多订阅关系,那么取消订阅的代码也就越来越多。造成了项目很难维护。所以我们必须寻找其他可靠简单可行的方式,也就是下面要介绍的。

3、rxlifecycle 框架的使用

  • github地址: https://github.com/trello/RxLifecycle
  • 在android studio 里面添加引用
    compile 'com.trello:rxlifecycle-components:0.6.1'
  • 让你的activity继承RxActivity,RxAppCompatActivity,RxFragmentActivity
    让你的fragment继承RxFragment,RxDialogFragment;下面的代码就以RxAppCompatActivity举例
  • bindToLifecycle 方法
    在子类使用Observable中的compose操作符,调用,完成Observable发布的事件和当前的组件绑定,实现生命周期同步。从而实现当前组件生命周期结束时,自动取消对Observable订阅。

     public class MainActivity extends RxAppCompatActivity {
            TextView textView ;
            
            @Override
            protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
            textView = (TextView) findViewById(R.id.textView);
        
            //循环发送数字
            Observable.interval(0, 1, TimeUnit.SECONDS)
                .subscribeOn( Schedulers.io())
                .compose(this.<Long>bindToLifecycle())   //这个订阅关系跟Activity绑定,Observable 和activity生命周期同步
                .observeOn( AndroidSchedulers.mainThread())
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        System.out.println("lifecycle--" + aLong);
                        textView.setText( "" + aLong );
                    }
                });
           }
        }
    

    上面的代码是Observable循环的发送数字,并且在textview中显示出来
    1、没加 compose(this.<Long>bindToLifecycle()) 当Activiry 结束掉以后,Observable还是会不断的发送数字,订阅关系没有解除
    2、添加compose(this.<Long>bindToLifecycle()) 当Activity结束掉以后,Observable停止发送数据,订阅关系解除。

  • 从上面的例子可以看出bindToLifecycle() 方法可以使Observable发布的事件和当前的Activity绑定,实现生命周期同步。也就是Activity 的 onDestroy() 方法被调用后,Observable 的订阅关系才解除。那能不能指定在Activity其他的生命状态和订阅关系保持同步,答案是有的。就是 bindUntilEvent()方法。这个逼装的好累!
  • bindUntilEvent( ActivityEvent event)
    • ActivityEvent.CREATE: 在Activity的onCreate()方法执行后,解除绑定。
    • ActivityEvent.START:在Activity的onStart()方法执行后,解除绑定。
    • ActivityEvent.RESUME:在Activity的onResume()方法执行后,解除绑定。
    • ActivityEvent.PAUSE: 在Activity的onPause()方法执行后,解除绑定。
    • ActivityEvent.STOP:在Activity的onStop()方法执行后,解除绑定。
    • ActivityEvent.DESTROY:在Activity的onDestroy()方法执行后,解除绑定。
     //循环发送数字
         Observable.interval(0, 1, TimeUnit.SECONDS)
                 .subscribeOn( Schedulers.io())
                 .compose(this.<Long>bindUntilEvent(ActivityEvent.STOP ))   //当Activity执行Onstop()方法是解除订阅关系
                 .observeOn( AndroidSchedulers.mainThread())
                 .subscribe(new Action1<Long>() {
                     @Override
                     public void call(Long aLong) {
                         System.out.println("lifecycle-stop-" + aLong);
                         textView.setText( "" + aLong );
                     }
                 });
    

    经过测试发现,当Activity执行了onStop()方法后,订阅关系已经解除了。
    上面说的都是订阅事件与Activity的生命周期同步,那么在Fragment里面又该怎么处理的?

  • FragmentEvent 这个类是专门处理订阅事件与Fragment生命周期同步的大杀器
    public enum FragmentEvent {
    
    ATTACH,
    CREATE,
    CREATE_VIEW,
    START,
    RESUME,
    PAUSE,
    STOP,
    DESTROY_VIEW,
    DESTROY,
    DETACH
    }
    

    可以看出FragmentEvent 和 ActivityEvent 类似,都是枚举类,用法是一样的。这里就不举例了!

总结
1、这三篇文章的相关代码示例都在 http://git.oschina.net/zyj1609/RxAndroid_RxJava
2、通过上面的三种方法,我相信你在项目中使用Rxjava的时候,已经能够很好的控制了 Rxjava对内存的开销。如果你有其他的方法或者问题,可以留言给我。

RxJava Subject

Subject

Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。

由于一个Subject订阅一个Observable,它可以触发这个Observable开始发射数据(如果那个Observable是”冷”的–就是说,它等待有订阅才开始发射数据)。因此有这样的效果,Subject可以把原来那个”冷”的Observable变成”热”的。

Subject的种类

针对不同的场景一共有四种类型的Subject。他们并不是在所有的实现中全部都存在,而且一些实现使用其它的命名约定(例如,在RxScala中Subject被称作PublishSubject)。

AsyncSubject

一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的*后一个值。(如果原始Observable没有发射任何值,AsyncObject也不发射任何值)它会把这*后一个值发射给任何后续的观察者。

%title插图%num

然而,如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知。 %title插图%num

BehaviorSubject

当观察者订阅BehaviorSubject时,它开始发射原始Observable*近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。 %title插图%num

然而,如果原始的Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何数据,只是简单的向前传递这个错误通知。 %title插图%num

PublishSubject

PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据(除非你可以阻止它发生),因此这里有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。如果要确保来自原始Observable的所有数据都被分发,你需要这样做:或者使用Create创建那个Observable以便手动给它引入”冷”Observable的行为(当所有观察者都已经订阅时才开始发射数据),或者改用ReplaySubject。 %title插图%num

如果原始的Observable因为发生了一个错误而终止,PublishSubject将不会发射任何数据,只是简单的向前传递这个错误通知。 %title插图%num

ReplaySubject

ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。也有其它版本的ReplaySubject,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据(原始Observable发射的)。

如果你把ReplaySubject当作一个观察者使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。

%title插图%num

RxJava的对应类

假设你有一个Subject,你想把它传递给其它的代理或者暴露它的Subscriber接口,你可以调用它的asObservable方法,这个方法返回一个Observable。具体使用方法可以参考Javadoc文档。

串行化

如果你把 Subject 当作一个 Subscriber 使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。

要避免此类问题,你可以将 Subject 转换为一个 SerializedSubject ,类似于这样:

mySafeSubject = new SerializedSubject( myUnsafeSubject );

外链流量是如何判断限制的?

比如某网站限制外链流量 1G/月, 说明媒体链接每月*多被访问到 1G.

它的服务器是如何统计外链流量的呢?

如果在网站内部被访问, 则不会被统计到流量, 只有单独访问(是吗)会被流量统计, 还是只有在非本站访问才会被流量统计? 比如手机浏览器里长按图片, 会单独打开这张图片, 这样会被统计到流量里吗?

技术上, 通过 header 里面 referer 字段可以进行判断吗? 但这样也有漏洞, 可以修改 header 字段来超额获取它. 那技术上是用什么实现的呢?

流量 header 统计 访问3 条回复 • 2021-07-02 16:47:34 +08:00
est 1
est 7 天前
大部分流量是普通人,普通人不会无聊得去修改 header
FaiChou 2
FaiChou 7 天前
@est 的确, 但作为服务端, 得做下限制吧, 防止非普通用户. 有哪些技术能有效限制住?
est 3
est 7 天前
@FaiChou 非普通用户就看你预算了。上不封顶。如果有人铁了心要搞你你也没辙只能用钱抵抗。。。

从 DSM 到 Truenas,高硬件低软件的 Nas 手记

需求分析与我的使用场景

早期使用习惯:

大概从高中开始我就用群晖的东西来管理我的个人文件,设备先后从 Gen8 黑群晖 —— DS 216j —— 之前的 Gen10 Plus 黑群晖
在这个时间段中,主要用途为: 群晖搭配 Download Station 下载东西
使用 Samba 作为分享给 Windows/电视盒子 的协议

现在的需求:

面向局域网内容器共享硬盘空间
足够快的媒体搜索,可方便的让我在不手动归类的情况下找到我想要的文件
提供面向对象存储的 API
稳定,可以接受复杂的首次部署但要简单的升级和日常维护
无明显的直接经济支出或者可以一次付款终身使用

  • High Hardware cost and Low Software cost

系统架构

硬件

Hpe Microserver Gen10 Plus

  • Intel E2146
  • 16G non-Reg ECC UDIMM x 2
  • Intel P3700
  • WD 4T x 2
  • Toshiba 8T x 2

硬件选型思考

首先这台机器放在我家客厅,我一个月大多数时候都在学校并且每次回家需要来回 4H 的通勤时长,以至于在这种情况下 IPMI 是硬需求。
虽然收一个 1U/2U 带 IPMI 的平台的确不错,不过噪音和体积在客厅就显得完全不合适了,而 Gen10p 摆电视柜下面刚刚好。

在使用过程中我还出现过升级固件导致平台下线不得不让我家人手动触发物理硬件开关的情况,不过总的来说。大部分日常运维操作(作死折腾),都可以依靠 IPMI 进行救援。

优点:真的好看,从外观到 ILO6,又小巧又能干的感觉,外置电源还可以很容易的藏在角落里。<del>是带尾巴的黑长直贫乳魅魔</del>
缺陷:Microserver Gen10 Plus *大的缺陷就是那可怜的内存和 PCI 扩展性,而且单 PCI 物理插槽在 HPE 的 BIOS 里只支持 x8x8 的 Bifurcation,直接导致了大部分的纯物理 PCI Split Card 都无法正常工作(包括我之前认为理论可能的使用 x4x4x4x4 Nvme split card 并只插 0 和 3 的方案,实际上在我的机器上仍然只有 Nvme 0 会被识别) 好在我并没有那么高的扩展要求,插个 P3700 就当没有 PCI 槽了。至于万兆,我觉得有那需求都不会考虑这种带有严格体积的四盘位存储机。

软件选型思考:

我选择了 TrueNas 作为新的文件服务操作系统,
首先不会继续考虑群晖了,事实上我之前一直使用 DSM 作为文件管理系统是因为手上有一台白裙可以服务降级,但 DSM6.2 以后的大面积翻车和常年停滞的黑裙安全性更新让我不再想继续选择
Ceph,我承认它足够的先进和分布式,但单机条件下,组件越多,单点故障率也的也越高
Unraid,不想考虑和群晖一样的破解方案了,以及在群聊的时候谈到这玩意的文件系统安全性也是灵车级别…

软件 ESXi 7.0

  • 原版 OpenWrt 19.07
    • 通过 ShellClash 进行分流代理可以解决我的联网需求,使用过 lede/koolshare 的魔改版,但是感觉添加的功能对我必要性不大
  • RHEL 8
    • 使用开发者计划领到的免费授权,安装 Podman 跑大部分的服务,推荐使用 podman-compose 这个工具直接用现成的 docker-compose.yml 完成很多现有应用的部署,并 generate 为 systemd services 进行自启动
  • TrueNas 11.2
  • 若干其他的测试折腾环境

文件迁移

迁移的过程中使用原来的 DS216j 作为数据拷贝的源机器

我准备了另外的硬盘并在 TrueNas 完成初始化并配置好 pool,这里直接创建 stripe data vdev 即可,如果数据大且十分重要,建议至少两块硬盘 MIRROR 来作为拷贝的中继

如果在群晖端执行 rsync 建议安装这个 synocli[https://synocommunity.com/package/synocli-net] 套件并全程使用 screen 执行以避免操作中断

我的命令以在 DS216j 上面执行 rsync 为演示

rsync -aHAXxv  --delete --progress -e ssh -T -o Compression=no -x [source_dir] [dest_host:dest_dir]

如果文件量比较大,建议拷贝前先在 TrueNas 的 Service 中打开 RSync 服务,并添加你拷贝的目标路径为 RSync Module

随后执行

rsync -aHAXxv  --delete --progress  [source_dir] [dest_host://module/dest_dir]

在我的测试工况下,1Gbps NIC 通过 ssh 协议拷贝峰值速度约为 22MBps,通过 rsync tcp 的峰值速度为 55MBps

随后拔下群晖的硬盘,插上并建立存储池,重复直到所有数据转移完成

工具的替代和使用习惯改变

File Station 替代品

我先后试用了 NextCloud,Firebrowser
更早之前还有可道云,Cloudreve,或者一些面板自带的文件管理等
*终还是拿 FileBrowser 做 Web 端的文件浏览
2323.png 缺点: MKV 没有字幕

Moments & Photo Station 替代品

这里我选择了 PhotoView 这个开源的方案,效果如下
20222-42-54.png

它支持 PWA,在移动设备上也能获得相对可以的体验

2021-4-27 254.png

缺点:
残废且占用*高 CPU 的人脸识别
只能读不能写,照片备份还需要 NextCloud 或者其他的客户端协同完成

Download Station 替代

我用 RHEL 8 跑起了 Qbitorrent Enhanced-Nox 用来订阅 RSS 下载的动漫,
PT 站我不是很常用,只是偶尔下一些不太好找的电影会去 MT,单独开了个 Transmission 用来挂 PT 的种子

对象文件存储

TrueNAS 官方 Plugin 里面有 Minio,直接安排(

服务暴露

我使用这个 https://nginxproxymanager.com/ 在一个高位端口转发我内网的大多数交互式 Web 服务
它同时支持图形化配置 Basic Auth 和简单的权限组 虽然在相当一段长的时间里我都沉溺于手写和折腾各种 Nginx 的 Configure trick
但是大多数时候手动配置只会给自己的维护添堵,尤其是想添加一个域名或者改一个转发接口就得连回去拷配置再调试 <del>Web 它不香吗</del>

而且证书的存放也是头疼的一件事,感觉找个容器把这些全关进去反倒能提高安全性和稳定性

感想和我踩的一些坑

  • Truenas 的内存占用真的非常恐怖,得益于 ZFS,这玩意甚至在我转移数据的时候吃满了分配的 16G 内存用作 ZFS cache 。
  • 我在转移下载盘的时候错误的开启了 ZFS 的 dedup,这个操作导致磁盘的拷贝 I/O 直接掉回了百兆时代。
  • 我*次给它分了 80G SSD 做系统空间,并期望它能利用好剩余的 SSD 分区用作 cache,
  • 理想很丰满,实际上这也是我对 Truenas 的误解。
  • 在查阅了很多博文后我了解到对于 ZFS 而言内存容量的影响远大于所谓的 SSD cache,
  • 因为这样,我重装了一下,并分配了 8G 的系统盘和 40G 的应用插件盘。
  • 以及 TrueNas 虽然表面上具备完善的 UI,但具体到文件管理,直接 SSH 进去一把梭反而是*方便的。
  • ACL 很容易把头搞炸但是配置成功了以后使用相当舒服,安定感十足的 BSD 。
  • NFS 在多终端同时挂载的工况下经常会出现一个文件 touch 提示存在而 cat/ls 却不存在的闹鬼情况
  • 推荐使用 WebDAV 作为内网挂载的协议(而且这玩意实际拷贝速度比 NFS 高好多)

需要一台服务器, 跑 gitlab、ci、mysql 之类的服务用于测试。现阶段是租用物理机好还是云服务器好呢?

如题。

第 1 条附言 · 36 天前
谢谢各位的建议。现在主要考虑的点是,如果用云主机的话会不会不方便公司内部程序的测试,如果是物理机的话宽带、服务器安全又是一个问题。比较纠结。
机好 MySQL gitlab 服务器11 条回复 • 2021-06-04 16:45:53 +08:00
cccp2020 1
cccp2020 36 天前
果断阿里云啊,*近很便宜,物理机还得考虑人力成本了: http://aakw.net/24mfx
beautwill 2
beautwill 36 天前
建议还是阿里或者腾讯云吧,v2er 专供折上折 v: beautwill
jingslunt 3
jingslunt 36 天前
win10 开个 10g 的虚拟机测试不香吗?没有外网需求 躺着多好,省得楼上收割。
CEBBCAT 4
CEBBCAT 36 天前
没用过物理机,但我觉得选云服务器应该差不了。物理机是不是还要操心系统安装之类的?云服务器从运维角度来讲应该会省心一些。

PS,一楼的链接是 AFF 链接: https://www.aliyun.com/activity/618/2021?userCode=0amqgcs9 去掉尾巴后是: https://www.aliyun.com/activity/618/2021

PSS 个人不推荐阿里云,可以看看腾讯云
zhengfan2016 5
zhengfan2016 36 天前
建议自己组一台 nas,弄个 10 代 i5 es 500 块,配个 matx 主板 500 块,2k 内搞定,美滋滋。有钱也可以买 29999 的 64 核县城撕裂者组 itx nas
chniccs 6
chniccs 36 天前
云吧,新用户一年的费用可能比你物理机的电费都便宜
limuyan44 7
limuyan44 36 天前
你这要求一年也就 99 块,费那些事干啥。
misaka19000 8
misaka19000 36 天前
@zhengfan2016 #5 能给个更详细一些的配置列表吗?谢谢♪(・ω・)ノ
q428202849 9
q428202849 36 天前
看你运算大不大
云和物理机我们都有可以找我
zhengfan2016 10
zhengfan2016 35 天前 ❤️ 1
@misaka19000 https://zhongce.sina.com.cn/article/view/79566/

11
Cbdy 35 天前 via Android
为什么不买个二手破电脑干这事儿

redis 的 6379 端口有没有什么办法能安全开放?

1
zilongzixue 40 天前
换个启动端口啊
ly4572615 2
ly4572615 39 天前
为啥密码没用,设置一个够长够复杂的
youzengwei 3
youzengwei 38 天前
防火墙白名单访问啊
opengps 4
opengps 38 天前
公网需要开发的后端服务都*好是配合 IP 白名单
ye4tar 5
ye4tar 37 天前
*准则就是限制外网访问,
如果需要外网访问可以在防火墙上对指定 IP 开放,没有防火墙的请使用 wireguard 做隧道,然后通过隧道的内网地址访问
巡查系统有没有被放置可以远程登录的 ssh 证书
限制 redis 端口错误登录的频率
记录各种可能被暴力猜解的服务的并限制频率,同时需要适当的告警机制
有条件的先格式化机器,重新设置 redis
使用非 root 用户启动 redis
Hardrain 6
Hardrain 10 天前
https://redis.io/topics/security

*条就是不要暴露在互联网上(bind 127.0.0.1)

如果不能做到这一条

考虑 rename 掉 config 命令,攻击者爆破密码登入后,用这个命令关闭 rdb 的压缩再修改 rdb 存储的位置,配合以会忽略无意义内容的脚本语言(如 php 忽略<?php ?>以外的内容,将其直接输出)便可以开个反弹代理然后完成挂马.

drop privilege(用低权限用户运行)也能阻止上面所述的挂马手段.

以上两种方法能一定程度防止攻击者利用 redis 挂马,但不能防止 redis 中的信息泄露.
使用 SSL 并验证客户端证书能对抗暴力破解,攻击者无法登入 redis,自然不存在信息泄露隐患