本文介绍Reactive Programming响应式编程,并基于Spring WebFlux进行工程实践

Signal信号 VS Element元素
Signal信号是响应式流协议中定义的一组事件Event,实现Publisher发布者、Subscriber订阅者之间的异步通信。而Element元素指的就是响应式流中传输的业务数据项Data Item。但在响应式流协议中,元素本身是不能独立存在、传输的。而是作为onNext信号的Payload出现,并通过onNext信号被推送给订阅者。故响应式流的本质就是一个异步的信号序列。通过这些信号来驱动数据(元素)的传输、状态的变迁。
响应式流协议中的信号可分为两类:Publisher发布者发出的信号、Subscriber订阅者发出的信号。常见的信号有:
- onSubscribe 订阅建立信号:
方向: Publisher → Subscriber
载荷:Subscription实例
说明:- 当Publisher发布者和Subscriber订阅者之间的建立订阅关系时,Publisher发布者向Subscriber订阅者发送的握手信号,该信号携带Subscription实例作为Payload载荷。后续,Subscriber订阅者通过该实例向Publisher发布者发送request/cancel信号
- 对于每个Subscriber订阅者而言,onSubscribe只允许出现1次。且该信号必须是第一个信号
- onNext 数据元素信号:
方向:Publisher → Subscriber
载荷:一个类型为T的数据元素Element
说明:- 该信号用于Publisher发布者向Subscriber订阅者异步推送一个数据元素。故所谓的发射元素,本质就是发射一个携带该元素的onNext信号
- 该信号可以出现0次或多次
- onComplete 正常完成信号:
方向:Publisher → Subscriber
载荷:无
说明:- 表明Publisher发布者已经完成所有数据的发布,且没有错误发生
- 该信号标志着流以成功状态终止
- Publisher发布者发送onComplete信号后,不能再发送任何其他信号
- 该信号最多出现1次
- onError信号 与 onComplete信号二者互斥,最多只能出现其中一个
- onError 异常终止信号:
方向:Publisher → Subscriber
载荷:Throwable 错误对象
说明:- 表明流在处理过程中出现错误。其将错误作为流中的一个事件进行传播,而不是通过传统的异常抛出机制
- 该信号携带Throwable错误对象作为Payload载荷,以供下游进行错误处理、日志记录
- 该信号标志流以失败状态终止
- Publisher发布者发送onError信号后,不能再发送任何其他信号
- 该信号最多出现1次
- onError信号 与 onComplete信号二者互斥,最多只能出现其中一个
- request 背压控制信号
方向:Subscriber → Publisher
载荷:请求数据的数量(long类型)
说明:- Subscriber订阅者通过Subscription实例向Publisher发布者发送的信号。用来通知Publisher发布者,其最多可以处理的新onNext信号的数量
- 用于实现Backpressure背压。实现了将数据流的控制权从Publisher发布者转移到Subscriber订阅者,使得Subscriber订阅者可根据自身处理能力来调节上游的推送速度和数量,避免下游被上游压垮
- cancel 取消信号
方向:Subscriber → Publisher
载荷:无
说明:- Subscriber订阅者通过Subscription实例向Publisher发布者发送的信号。用于通知Publisher发布者,其对后续任何的信号都不感兴趣,订阅关系需要提前终止。实现Subscriber订阅者在上游发送onComplete/onError信号前主动结束流
- Publisher发布者收到此信号后,应尽快停止发送任何信号并清理相关资源
- 该信号可以出现多次,但需保证幂等
需要强调的是,Publisher发布者、Subscriber订阅者这两个角色并不仅仅是指响应式链两端的起点和终点。处在中间的操作符既是发布者、也是订阅者,它订阅其上游,并作为新的发布者供其下游订阅。以下述代码为例:
- Flux.range():位于响应式链的最顶端,其是下游delayElements()的Publisher发布者
- delayElements():处于响应式链的中间。对于它的上游Flux.range()而言,其是Subscriber订阅者的角色;对于它的下游map()而言,其是Publisher发布者的角色
- map():处于响应式链的中间。对于它的上游delayElements()而言,其是Subscriber订阅者的角色;对于它的下游subscribe()而言,其是Publisher发布者的角色
- subscribe():位于响应式链的最底端,其是上游map()的Subscriber订阅者
1 | Flux.range(10,3) |
下述代码详细展示了响应式流中各种信号的实际交互过程
1 | import java.time.Duration; |
输出如下
1 | ============================== 正常完成的流 ============================== |
工程实践
脚手架
基于Spring Boot WebFlux搭建,POM依赖如下。重点:引入Spring Boot WebFlux依赖。数据库链接不能使用传统的JDBC,而应选择R2DBC
1 | <dependencyManagement> |
配置文件如下
1 | server: |
API接口统一包装类 ApiSingleResult如下
1 | package com.aaron.reactive.common; |
API接口统一包装类 ApiMultiResult 如下
1 | package com.aaron.reactive.common; |
API接口统一包装类 ApiStreamResult 如下
1 | package com.aaron.reactive.common; |
启动类如下
1 | package com.aaron.reactive; |
CRUD
这里来实现用户的增删改查需求,建表语句如下
1 | CREATE TABLE IF NOT EXISTS `user_info` ( |
用户Entity实体类定义如下。其中,使用@Version的字段有两个作用:
- 通过是否为NULL,用于save等方法判定插入/更新。如果为NULL,则新增记录;如果非NULL;则更新记录。此外新增时,对于主键字段而言,如果为NULL,则insert sql语句中不包含主键字段。例如:使用数据库的自增id;如果非NULL,则insert sql语句中包含主键字段。例如:使用uuid算法、雪花算法等自定义主键
- 用于实现乐观锁。新增插入时,内置方法生成的sql语句会将该字段值会设置为0;更新修改时,内置方法生成的sql语句会在where条件中对该字段进行版本校验,并在set语句中让version值加1
1 | package com.aaron.reactive.entity; |
用户Repository实现如下
1 | package com.aaron.reactive.repository; |
实现Service、Controller前,先提供一个创建用户所需的Dto
1 | package com.aaron.reactive.dto; |
用户Controller实现如下
1 | package com.aaron.reactive.controller; |
用户Service实现如下
1 | package com.aaron.reactive.service; |
UT
针对用户的单元测试UT实现如下
1 | package com.aaron.reactive.controller; |