Project Reactor 核心原理解析

一、开篇

本文将解析 Spring 的 Reactor 项目的源码。主要目的是让自己能深入理解 Reactor 这个项目,以及 Spring 5 和 Spring Boot 2。

Project Reactor 项目地址:https://github.com/reactor

Reactor 项目主要包含 Reactor Core 和 Reactor Netty 两部分。Reactor Core 实现了反应式编程的核心功能,Reactor Netty 则是 Spring WebFlux 等技术的基础。本文将介绍 Core 模块的核心原理。

本文从一个例子开始:

  1. public static void main(String[] args) {
  2. Flux.just(“tom”, “jack”, “allen”)
  3. .filter(s -> s.length() > 3)
  4. .map(s -> s.concat(“@qq.com”))
  5. .subscribe(System.out::println);
  6. }

整个 Project Reactor 的核心调用分为下面几个阶段:

  • 声明阶段
  • subscribe 阶段
  • onSubscribe 阶段
  • request 阶段
  • 调用阶段

接下来对每个阶段详细介绍。

二、声明阶段

简介

之前的文章介绍过,反应式编程和传统风格的编程一个*大的不同点在于,开发人员编写的大部分代码都是在声明处理过程。即这些代码并不会被实际的执行,直到开始被订阅。这便是为何*阶段是声明阶段的原因。

详解

先来看*个方法 Flux.just 的源码:

  1. public static <T> Flux<T> just(T… data) {
  2. return fromArray(data);
  3. }
  4. public static <T> Flux<T> fromArray(T[] array) {
  5. // 省略不重要的部分
  6. return onAssembly(new FluxArray<>(array));
  7. }

just 方法只是创建反应式流的众多方式的一个。在实际工作中,更常见的通过反应式 Repository 将数据库查询结果,或通过 Spring 5 的 WebClient 将 HTTP 调用结果*为流的开始。

onAssembly 是一个扩展机制,因为实例中并没有使用,所以可简单理解为将入参直接返回。

接下来 new FluxArray<>(array) 做的事情也很简单,仅仅是把入参赋给成员变量。

再接下来 filter(s -> s.length() > 3) 做的事情是把上一个 Flux,即 FluxArray,以及 s -> s.length() > 3 所表示的 Predicate 保存起来。

看一下源码

  1. public final Flux<T> filter(Predicate<? super T> p) {
  2. if (this instanceof Fuseable) {
  3. return onAssembly(new FluxFilterFuseable<>(this, p));
  4. }
  5. return onAssembly(new FluxFilter<>(this, p));
  6. }
  7. FluxFilterFuseable(Flux<? extends T> source,
  8. Predicate<? super T> predicate) {
  9. super(source);
  10. this.predicate =
  11. Objects.requireNonNull(predicate, “predicate”);
  12. }

这里有个逻辑判断,就是返回的值分为了 Fuseable 和非 Fuseable 两种。简单介绍一下,Fuseable 的意思就是可融合的。我理解就是 Flux 表示的是一个类似集合的概念,有一些集合类型可以将多个元素融合在一起,打包处理,而不必每个元素都一步步的处理,从而提高了效率。因为 FluxArray 中的数据一开始就都准备好了,因此可以打包处理,因此就是 Fuseable

而接下来的 map 操作也对应一个 FluxMapFusable。原理与 filter 操作,这里不再重复。

三、subscribe 阶段

简介

subscribe 阶段同行会触发数据发送。在本例中,后面可以看到,对于 FluxArray,数据发送很简单,就是循环发送。而对于像数据库、RPC 这样的长久,则会触发请求的发送。

详解

当调用 subscribe(System.out::println) 时,整个执行过程便进入 subscribe 阶段。经过一系列的调用之后,subscribe 动作会代理给具体的 Flux 来实现。就本例来说,会代理给 FluxMapFuseable,方法实现如下(经过简化):

  1. public void subscribe(CoreSubscriber<? super R> actual) {
  2. source.subscribe(
  3. new MapFuseableSubscriber<>(actual, mapper)
  4. );
  5. }

其中的 source 变量对应的是当前 Flux 的上一个。本例中,FluxMapFuseable 上一个是 FluxFilterFuseable

new MapFuseableSubscriber<>(actual, mapper) 则是将订阅了 FluxMapFuseable 的 Subscriber 和映射器封装在一起,组成一个新的 Subscriber。然后再订阅 source,即 FluxArraysource 是在上一个阶段被保存下来的。

这里强调一下 Publisher 接口中的 subscribe 方法语义上有些奇特,它表示的不是订阅关系,而是被订阅关系。即 aPublisher.subscribe(aSubscriber) 表示的是 aPublisher 被 aSubscriber 订阅。

接下来调用的就是 FluxFilterFuseable 中的 subscribe 方法,类似于 FluxMapFuseable,源码如下:

  1. public void subscribe(CoreSubscriber<? super T> actual) {
  2. source.subscribe(
  3. new FilterFuseableSubscriber<>(actual, predicate)
  4. );
  5. }

这时 source 就成了 FluxArray。于是,接下来是调用 FluxArray 的 subscribe 方法。

  1. public static <T> void subscribe(CoreSubscriber<? super T> s,
  2. T[] array) {
  3. if (array.length == 0) {
  4. Operators.complete(s);
  5. return;
  6. }
  7. if (s instanceof ConditionalSubscriber) {
  8. s.onSubscribe(
  9. new ArrayConditionalSubscription<>(
  10. (ConditionalSubscriber<? super T>) s, array
  11. )
  12. );
  13. }
  14. else {
  15. s.onSubscribe(new ArraySubscription<>(s, array));
  16. }
  17. }

*重要的部分还是 s.onSubscribe(new ArraySubscription<>(s, array))。不同于 FluxMapFuseable 和 FluxFilterFuseableFluxArray 没有再调用 subscribe 方法,因为它是数据源头。而 FluxMapFuseable 和 FluxFilterFuseable 则是中间过程。

可以这样简单理解,对于中间过程的 Mono/Flux,subscribe 阶段是订阅上一个 Mono/Flux;而对于源 Mono/Flux,则是要执行 Subscriber.onSubscribe(Subscription s) 方法。

四、onSubscribe 阶段

简介

在调用 FluxArray 的 subscribe 方法之后,执行过程便进入了 onSubscribe 阶段。onSubscribe 阶段指的是 Subscriber#onSubscribe 方法被依次调用的阶段。这个阶段会让各 Subscriber 知道 subscribe 方法已被触发,真正的处理流程马上就要开始。所以这一阶段的工作相对简单。

详解

s.onSubscribe(new ArraySubscription<>(s, array));

s 是 FilterFuseableSubscriber,看一下 FilterFuseableSubscriber 的 onSubscribe(Subscription s) 源码:

  1. public void onSubscribe(Subscription s) {
  2. if (Operators.validate(this.s, s)) {
  3. this.s = (QueueSubscription<T>) s;
  4. actual.onSubscribe(this);
  5. }
  6. }

actual 对应 MapFuseableSubscriberMapFuseableSubscriber 的 onSubscribe 方法也是这样,但 actual 对于的则是代表 System.out::println 的 LambdaSubscriber

调用过程:

FluxArray.subscribe -> FilterFuseableSubscriber.onSubscribe -> MapFuseableSubscriber.onSubscribe -> LambdaSubscriber.onSubscribe

五、request 阶段

含义

onSubscribe 阶段是表示订阅动作的方式,让各 Subscriber 知悉,准备开始处理数据。当*终的 Subscriber 做好处理数据的准备之后,它便会调用 Subscription 的 request 方法请求数据。

详解

下面是 LambdaSubscriber onSubscribe 方法的源码:

  1. public final void onSubscribe(Subscription s) {
  2. if (Operators.validate(subscription, s)) {
  3. this.subscription = s;
  4. if (subscriptionConsumer != null) {
  5. try {
  6. subscriptionConsumer.accept(s);
  7. }
  8. catch (Throwable t) {
  9. Exceptions.throwIfFatal(t);
  10. s.cancel();
  11. onError(t);
  12. }
  13. }
  14. else {
  15. s.request(Long.MAX_VALUE);
  16. }
  17. }
  18. }

由上可见 request 方法被调用。在本例中,这里的 s 是 MapFuseableSubscriber

这里需要说明,如 MapFuseableSubscriberFilterFuseableSubscriber,它们都有两个角色。一个角色是 Subscriber,另一个角色是 Subscription。因为它们都位于调用链的中间,本身并不产生数据,也不需要对数据暂存,但是需要对数据做各式处理。因此,在 onSubscribe、request 阶段,以及后面要讲到的调用阶段都需要起到代理的作用。这就解释了 actual.onSubscribe(this) onSubscribe 自己的原因。

下面是 FilterFuseableSubscriber 的 request 方法的源码。充分可见其代理的角色。

  1. public void request(long n) {
  2. s.request(n);
  3. }

*后 ArraySubscription 的 request 方法将被调用。

与 map、filter 等操作不同,flatMap 有比较大的差异,感兴趣的同学可以自己研究一下。本文先不详细介绍。

六、调用阶段

含义解释

这一阶段将会通过调用 Subscriber 的 onNext 方法,从而进行真正的反应式的数据处理。

流程分解

在 ArraySubscription 的 request 方法被调用之后,执行流程便开始了*后的调用阶段。

  1. public void request(long n) {
  2. if (Operators.validate(n)) {
  3. if (Operators.addCap(REQUESTED, this, n) == 0) {
  4. if (n == Long.MAX_VALUE) {
  5. fastPath();
  6. }
  7. else {
  8. slowPath(n);
  9. }
  10. }
  11. }
  12. }
  13. void fastPath() {
  14. final T[] a = array;
  15. final int len = a.length;
  16. final Subscriber<? super T> s = actual;
  17. for (int i = index; i != len; i++) {
  18. if (cancelled) {
  19. return;
  20. }
  21. T t = a[i];
  22. if (t == null) {
  23. s.onError(new NullPointerException(“The “
  24. + i
  25. + “th array element was null”)
  26. );
  27. return;
  28. }
  29. s.onNext(t);
  30. }
  31. if (cancelled) {
  32. return;
  33. }
  34. s.onComplete();
  35. }

ArraySubscription 会循环数据中的所有元素,然后调用 Subscriber 的 onNext 方法,将元素交由 Subscriber 链处理。

于是接下来由 FilterFuseableSubscriber 处理。下面是其 onNext 方法(做了一些简化)

  1. public void onNext(T t) {
  2. boolean b;
  3. try {
  4. b = predicate.test(t);
  5. }
  6. catch (Throwable e) {
  7. Throwable e_ =
  8. Operators.onNextError(t, e, this.ctx, s);
  9. if (e_ != null) {
  10. onError(e_);
  11. }
  12. else {
  13. s.request(1);
  14. }
  15. Operators.onDiscard(t, this.ctx);
  16. return;
  17. }
  18. if (b) {
  19. actual.onNext(t);
  20. }
  21. else {
  22. s.request(1);
  23. Operators.onDiscard(t, this.ctx);
  24. }
  25. }

其*要部分是通过 predicate 进行判断,如果满足条件,则交由下一个 Subscriber 处理。

下一个要处理的是 MapFuseableSubscriber,原理类似,不再重复。

*终要处理数据的是 LambdaSubscriber,下面是它的 onNext 方法源码:

  1. public final void onNext(T x) {
  2. try {
  3. if (consumer != null) {
  4. consumer.accept(x);
  5. }
  6. }
  7. catch (Throwable t) {
  8. Exceptions.throwIfFatal(t);
  9. this.subscription.cancel();
  10. onError(t);
  11. }
  12. }

consumer.accept(x) 会使数据打印在命令行中。

整个流程的解释到这里基本结束。

七、总结

接下来我们用一个图来回顾一下整体的流程。

Reactor Core

参考

需要感谢下面这篇文章,帮助我很好理解了 Project Reactor 的核心处理流程。

由表及里学 ProjectReactor:http://blog.yannxia.top/2018/06/26/java/spring/projectreactor/