0%

浅谈Reactive Programming响应式编程——Spring WebFlux

本文介绍Reactive Programming响应式编程,并基于Spring WebFlux进行工程实践

abstract.PNG

Signal信号 VS Element元素

Signal信号是响应式流协议中定义的一组事件Event,实现Publisher发布者、Subscriber订阅者之间的异步通信。而Element元素指的就是响应式流中传输的业务数据项Data Item。但在响应式流协议中,元素本身是不能独立存在、传输的。而是作为onNext信号的Payload出现,并通过onNext信号被推送给订阅者。故响应式流的本质就是一个异步的信号序列。通过这些信号来驱动数据(元素)的传输、状态的变迁。

响应式流协议中的信号可分为两类:Publisher发布者发出的信号、Subscriber订阅者发出的信号。常见的信号有:

  • onSubscribe 订阅建立信号:
    方向: Publisher → Subscriber
    载荷:Subscription实例
    说明:
    1. 当Publisher发布者和Subscriber订阅者之间的建立订阅关系时,Publisher发布者向Subscriber订阅者发送的握手信号,该信号携带Subscription实例作为Payload载荷。后续,Subscriber订阅者通过该实例向Publisher发布者发送request/cancel信号
    2. 对于每个Subscriber订阅者而言,onSubscribe只允许出现1次。且该信号必须是第一个信号
  • onNext 数据元素信号:
    方向:Publisher → Subscriber
    载荷:一个类型为T的数据元素Element
    说明:
    1. 该信号用于Publisher发布者向Subscriber订阅者异步推送一个数据元素。故所谓的发射元素,本质就是发射一个携带该元素的onNext信号
    2. 该信号可以出现0次或多次
  • onComplete 正常完成信号:
    方向:Publisher → Subscriber
    载荷:无
    说明:
    1. 表明Publisher发布者已经完成所有数据的发布,且没有错误发生
    2. 该信号标志着流以成功状态终止
    3. Publisher发布者发送onComplete信号后,不能再发送任何其他信号
    4. 该信号最多出现1次
    5. onError信号 与 onComplete信号二者互斥,最多只能出现其中一个
  • onError 异常终止信号:
    方向:Publisher → Subscriber
    载荷:Throwable 错误对象
    说明:
    1. 表明流在处理过程中出现错误。其将错误作为流中的一个事件进行传播,而不是通过传统的异常抛出机制
    2. 该信号携带Throwable错误对象作为Payload载荷,以供下游进行错误处理、日志记录
    3. 该信号标志流以失败状态终止
    4. Publisher发布者发送onError信号后,不能再发送任何其他信号
    5. 该信号最多出现1次
    6. onError信号 与 onComplete信号二者互斥,最多只能出现其中一个
  • request 背压控制信号
    方向:Subscriber → Publisher
    载荷:请求数据的数量(long类型)
    说明:
    1. Subscriber订阅者通过Subscription实例向Publisher发布者发送的信号。用来通知Publisher发布者,其最多可以处理的新onNext信号的数量
    2. 用于实现Backpressure背压。实现了将数据流的控制权从Publisher发布者转移到Subscriber订阅者,使得Subscriber订阅者可根据自身处理能力来调节上游的推送速度和数量,避免下游被上游压垮
  • cancel 取消信号
    方向:Subscriber → Publisher
    载荷:无
    说明:
    1. Subscriber订阅者通过Subscription实例向Publisher发布者发送的信号。用于通知Publisher发布者,其对后续任何的信号都不感兴趣,订阅关系需要提前终止。实现Subscriber订阅者在上游发送onComplete/onError信号前主动结束流
    2. Publisher发布者收到此信号后,应尽快停止发送任何信号并清理相关资源
    3. 该信号可以出现多次,但需保证幂等

需要强调的是,Publisher发布者、Subscriber订阅者这两个角色并不仅仅是指响应式链两端的起点和终点。处在中间的操作符既是发布者、也是订阅者,它订阅其上游,并作为新的发布者供其下游订阅。以下述代码为例:

  1. Flux.range():位于响应式链的最顶端,其是下游delayElements()的Publisher发布者
  2. delayElements():处于响应式链的中间。对于它的上游Flux.range()而言,其是Subscriber订阅者的角色;对于它的下游map()而言,其是Publisher发布者的角色
  3. map():处于响应式链的中间。对于它的上游delayElements()而言,其是Subscriber订阅者的角色;对于它的下游subscribe()而言,其是Publisher发布者的角色
  4. subscribe():位于响应式链的最底端,其是上游map()的Subscriber订阅者
1
2
3
4
Flux.range(10,3)
.delayElements(Duration.ofSeconds(5))
.map(num->num*2)
.subscribe();

下述代码详细展示了响应式流中各种信号的实际交互过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import java.time.Duration;
import reactor.core.publisher.Flux;

public class SignalDemo {

public static void main(String[] args) throws Exception {
title("正常完成的流");
normalFlux();

title("发生错误的流");
errorFlux();

title("被取消的流");
cancelFlux();
}

/**
* 正常完成的流
* @throws Exception
*/
public static void normalFlux() throws Exception {
Flux.range(10,3)
// log()操作符:拦截、记录流经其所在位置的的所有信号
.log("Normal.Flux #1")
.delayElements(Duration.ofSeconds(5))
.log("Normal.Flux #2")
.subscribe();

Thread.sleep(1000*20);
}

/**
* 发生错误的流
* @throws Exception
*/
public static void errorFlux() throws Exception {
Flux.interval(Duration.ofSeconds(5))
.log("Error.Flux #1")
.map( num->{
if( num==1) {
throw new IllegalArgumentException("Error Num");
}
return num;
} )
.log("Error.Flux #2")
.subscribe(v->{}, e->{});

Thread.sleep(1000*20);
}

/**
* 被取消的流
* @throws InterruptedException
*/
public static void cancelFlux() throws InterruptedException {
Flux.interval(Duration.ofSeconds(5))
.log("Cancel.Flux #1")
.take(3)
.log("Cancel.Flux #2")
.subscribe();

Thread.sleep(1000*20);
}

public static void title(String title) {
String separator = "=".repeat(30);
System.out.println("\n\n"+separator+" "+title+" "+separator);
}
}

输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
============================== 正常完成的流 ==============================
17:40:33.637 [main] INFO Normal.Flux #1 -- | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
17:40:33.639 [main] INFO Normal.Flux #2 -- onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber)
17:40:33.641 [main] INFO Normal.Flux #2 -- request(unbounded)
17:40:33.641 [main] INFO Normal.Flux #1 -- | request(1)
17:40:33.641 [main] INFO Normal.Flux #1 -- | onNext(10)
17:40:38.659 [parallel-1] INFO Normal.Flux #2 -- onNext(10)
17:40:38.660 [parallel-1] INFO Normal.Flux #1 -- | request(1)
17:40:38.660 [parallel-1] INFO Normal.Flux #1 -- | onNext(11)
17:40:43.665 [parallel-2] INFO Normal.Flux #2 -- onNext(11)
17:40:43.666 [parallel-2] INFO Normal.Flux #1 -- | request(1)
17:40:43.666 [parallel-2] INFO Normal.Flux #1 -- | onNext(12)
17:40:43.671 [parallel-2] INFO Normal.Flux #1 -- | onComplete()
17:40:48.667 [parallel-3] INFO Normal.Flux #2 -- onNext(12)
17:40:48.667 [parallel-3] INFO Normal.Flux #2 -- onComplete()


============================== 发生错误的流 ==============================
17:40:53.661 [main] INFO Error.Flux #1 -- onSubscribe(FluxInterval.IntervalRunnable)
17:40:53.662 [main] INFO Error.Flux #2 -- onSubscribe(FluxMap.MapSubscriber)
17:40:53.662 [main] INFO Error.Flux #2 -- request(unbounded)
17:40:53.662 [main] INFO Error.Flux #1 -- request(unbounded)
17:40:58.667 [parallel-4] INFO Error.Flux #1 -- onNext(0)
17:40:58.669 [parallel-4] INFO Error.Flux #2 -- onNext(0)
17:41:03.668 [parallel-4] INFO Error.Flux #1 -- onNext(1)
17:41:03.679 [parallel-4] INFO Error.Flux #1 -- cancel()
17:41:03.679 [parallel-4] ERROR Error.Flux #2 -- onError(java.lang.IllegalArgumentException: Error Num)
17:41:03.680 [parallel-4] ERROR Error.Flux #2 --
java.lang.IllegalArgumentException: Error Num
...


============================== 被取消的流 ==============================
17:41:13.673 [main] INFO Cancel.Flux #1 -- onSubscribe(FluxInterval.IntervalRunnable)
17:41:13.673 [main] INFO Cancel.Flux #2 -- onSubscribe(FluxLimitRequest.FluxLimitRequestSubscriber)
17:41:13.673 [main] INFO Cancel.Flux #2 -- request(unbounded)
17:41:13.673 [main] INFO Cancel.Flux #1 -- request(3)
17:41:18.679 [parallel-5] INFO Cancel.Flux #1 -- onNext(0)
17:41:18.680 [parallel-5] INFO Cancel.Flux #2 -- onNext(0)
17:41:23.679 [parallel-5] INFO Cancel.Flux #1 -- onNext(1)
17:41:23.681 [parallel-5] INFO Cancel.Flux #2 -- onNext(1)
17:41:28.678 [parallel-5] INFO Cancel.Flux #1 -- onNext(2)
17:41:28.680 [parallel-5] INFO Cancel.Flux #2 -- onNext(2)
17:41:28.680 [parallel-5] INFO Cancel.Flux #1 -- cancel()
17:41:28.680 [parallel-5] INFO Cancel.Flux #2 -- onComplete()

工程实践

脚手架

基于Spring Boot WebFlux搭建,POM依赖如下。重点:引入Spring Boot WebFlux依赖。数据库链接不能使用传统的JDBC,而应选择R2DBC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
<dependencyManagement>
<dependencies>
<!--Spring Boot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>3.2.5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!--Spring Boot WebFlux-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<!--Spring Data R2DBC-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>

<!--MySQL R2DBC-->
<dependency>
<groupId>io.asyncer</groupId>
<artifactId>r2dbc-mysql</artifactId>
<scope>runtime</scope>
</dependency>

<!--Lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

<!--Tool-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.1</version>
</dependency>

<!-- Unit Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>

<!--FastJson-->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.58</version>
</dependency>

</dependencies>

配置文件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
server:
port: 6969

spring:
application:
name: reactive
r2dbc:
url: r2dbc:mysql://localhost:3306/db1
username: root
password: 52996
pool:
# 启用连接池
enabled: true

API接口统一包装类 ApiSingleResult如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.aaron.reactive.common;

import lombok.AllArgsConstructor;
import lombok.Data;

@AllArgsConstructor
@Data
public class ApiSingleResult<T> {
private int code;

private boolean success;

private String message;

private T data;

public static <T> ApiSingleResult<T> success(T data) {
return new ApiSingleResult<>(0, true, null, data);
}

public static <T> ApiSingleResult<T> error() {
return new ApiSingleResult<>(-1, false, null, null);
}


public static <T> ApiSingleResult<T> error(int code) {
return new ApiSingleResult<>(code, false, null, null);
}

public static <T> ApiSingleResult<T> error(String message) {
return new ApiSingleResult<>(-1, false, message, null);
}

public static <T> ApiSingleResult<T> error(int code, String message) {
return new ApiSingleResult<>(code, false, message, null);
}

}

API接口统一包装类 ApiMultiResult 如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.aaron.reactive.common;

import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Data;


@AllArgsConstructor
@Data
public class ApiMultiResult<T> {
private int code;

private boolean success;

private String message;

private Long total;

private List<T> data;


public static <T> ApiMultiResult<T> success(List<T> data) {
return new ApiMultiResult<>(0, true, null, null, data);
}

public static <T> ApiMultiResult<T> success(Long total, List<T> data) {
return new ApiMultiResult<>(0, true, null, total, data);
}

public static <T> ApiMultiResult<T> error(int code) {
return new ApiMultiResult<>(code, false, null, null, null);
}

public static <T> ApiMultiResult<T> error(String message) {
return new ApiMultiResult<>(-1, false, message, null, null);
}


public static <T> ApiMultiResult<T> error(int code, String message) {
return new ApiMultiResult<>(code, false, message, null, null);
}

}

API接口统一包装类 ApiStreamResult 如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.aaron.reactive.common;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;

@Data
@AllArgsConstructor
public class ApiStreamResult<T> {
private EventType eventType;

private T data;

private int errorCode;

private String errorMessage;

public static <T> ApiStreamResult<T> start() {
return new ApiStreamResult<>(EventType.STREAM_START, null, 0, null);
}

public static <T> ApiStreamResult<T> data(T data) {
return new ApiStreamResult<>(EventType.DATA, data, 0, null);
}

public static <T> ApiStreamResult<T> end() {
return new ApiStreamResult<>(EventType.STREAM_END, null, 0, null);
}

public static <T> ApiStreamResult<T> error(int errorCode) {
return new ApiStreamResult<>(EventType.STREAM_ERROR, null, errorCode, null);
}

public static <T> ApiStreamResult<T> error(String errorMessage) {
return new ApiStreamResult<>(EventType.STREAM_ERROR, null, -1, errorMessage);
}

public static <T> ApiStreamResult<T> error(int errorCode, String errorMessage) {
return new ApiStreamResult<>(EventType.STREAM_ERROR, null, errorCode, errorMessage);
}

@AllArgsConstructor
@Getter
public static enum EventType {
STREAM_START("流开始事件"),

DATA("数据事件"),

STREAM_END("流结束事件"),

STREAM_ERROR("流错误事件"),

;

private String desc;
}
}

启动类如下

1
2
3
4
5
6
7
8
9
10
11
12
package com.aaron.reactive;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class ReactiveApplication {
public static void main(String[] args) {
SpringApplication.run(ReactiveApplication.class ,args);
}
}

CRUD

这里来实现用户的增删改查需求,建表语句如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE IF NOT EXISTS `user_info` (
id bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
gmt_create datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
gmt_modified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',

name varchar(64) DEFAULT NULL COMMENT '姓名',
sex varchar(32) DEFAULT NULL COMMENT '性别',
age int DEFAULT NULL COMMENT '年龄',
country varchar(64) DEFAULT NULL COMMENT '国家',
is_vip boolean DEFAULT NULL COMMENT '是否为会员',
remark text DEFAULT NULL COMMENT '备注',

version bigint unsigned NOT NULL COMMENT '版本',
is_deleted tinyint unsigned NOT NULL DEFAULT '0' COMMENT '逻辑删除(0正常,1删除)',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='用户';

用户Entity实体类定义如下。其中,使用@Version的字段有两个作用:

  1. 通过是否为NULL,用于save等方法判定插入/更新。如果为NULL,则新增记录;如果非NULL;则更新记录。此外新增时,对于主键字段而言,如果为NULL,则insert sql语句中不包含主键字段。例如:使用数据库的自增id;如果非NULL,则insert sql语句中包含主键字段。例如:使用uuid算法、雪花算法等自定义主键
  2. 用于实现乐观锁。新增插入时,内置方法生成的sql语句会将该字段值会设置为0;更新修改时,内置方法生成的sql语句会在where条件中对该字段进行版本校验,并在set语句中让version值加1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.aaron.reactive.entity;

import java.time.LocalDateTime;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.ReadOnlyProperty;
import org.springframework.data.annotation.Version;
import org.springframework.data.relational.core.mapping.Column;
import org.springframework.data.relational.core.mapping.Table;


// 表名映射
@Table(name = "user_info")
@Data
public class User {
// 标识主键
@Id
private Long id;

// 列名映射
@Column("gmt_create")
// 只读字段, 插入、修改时数据库会自动设置
@ReadOnlyProperty
private LocalDateTime createTime;

// 列名映射
@Column("gmt_modified")
// 只读字段, 插入、修改时数据库会自动设置
@ReadOnlyProperty
private LocalDateTime updateTime;

private String name;

private String sex;

private Integer age;

private String country;

private Boolean isVip;

private String remark;

// 版本字段, 用作乐观锁
@Version
private Long version;

/**
* 0代表正常,1代表删除
*/
// 列名映射
@Column("is_deleted")
private Byte isDel = 0;
}

用户Repository实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.aaron.reactive.repository;

import com.aaron.reactive.entity.User;
import org.springframework.data.domain.Pageable;
import org.springframework.data.r2dbc.repository.Modifying;
import org.springframework.data.r2dbc.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;


@Repository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
/**
* 根据ID查询
* @param id
* @return
*/
@Query("SELECT * FROM user_info WHERE is_deleted=0 and id=:id")
Mono<User> fetchById(@Param("id") Long id);

/**
* 查询全部
* @return
*/
@Query("SELECT * FROM user_info WHERE is_deleted=0")
Flux<User> findAll();

/**
* 分页查询:统计总数
* @return
*/
@Query("SELECT count(1) FROM user_info WHERE is_deleted=0" )
Mono<Long> count4PageQuery();

/**
* 分页查询:获取list
* @param pageable
* @return
*/
@Query("SELECT * FROM user_info WHERE is_deleted=0 ORDER BY id ASC LIMIT :#{#pageable.pageSize} OFFSET :#{#pageable.offset}")
Flux<User> pageQuery(@Param("pageable") Pageable pageable);

/**
* 更新备注
* @param id
* @param remark
* @return
*/
@Modifying
@Query("UPDATE user_info SET remark=:remark, version=version+1 WHERE is_deleted=0 and id=:id and version=:version")
Mono<Integer> updateRemark(@Param("id") Long id, @Param("remark") String remark, @Param("version") Long version);

/**
* 逻辑删除
* @param id
* @return
*/
@Modifying
@Query("UPDATE user_info SET is_deleted=1, version=version+1 WHERE id=:id and version=:version")
Mono<Integer> delete(@Param("id") Long id, @Param("version") Long version);
}

实现Service、Controller前,先提供一个创建用户所需的Dto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.aaron.reactive.dto;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;


@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class CreateUserDto {
private String name;

private String sex;

private Integer age;

private String country;

private Boolean isVip;

private String remark;
}

用户Controller实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package com.aaron.reactive.controller;

import java.time.Duration;
import java.time.LocalTime;
import com.alibaba.fastjson2.JSON;
import com.aaron.reactive.common.ApiMultiResult;
import com.aaron.reactive.common.ApiSingleResult;
import com.aaron.reactive.common.ApiStreamResult;
import com.aaron.reactive.dto.CreateUserDto;
import com.aaron.reactive.entity.User;
import com.aaron.reactive.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;


@Slf4j
@RestController
@RequestMapping("/user")
public class UserController {
@Autowired
private UserService userService;

/**
* 创建用户
* @param createUserDto
* @return
*/
@PostMapping("/create")
public Mono<ApiSingleResult<User>> createUser(@RequestBody CreateUserDto createUserDto) {
log.info("UserController createUser param : {}", JSON.toJSONString(createUserDto));
Mono<User> userMono = userService.createUser(createUserDto);
Mono<ApiSingleResult<User>> resultMono = userMono.map(user -> ApiSingleResult.success(user))
.doOnSuccess( res-> log.info("UserController createUser res: {}", JSON.toJSONString(res)) )
.onErrorResume(ex -> {
log.error("UserController createUser happen ex: ", ex);
return Mono.just(ApiSingleResult.error(ex.getMessage()));
});
return resultMono;
}

/**
* 根据ID查询用户
* @param id
* @return
*/
@GetMapping("/fetchById")
public Mono<ApiSingleResult<User>> fetchById(@RequestParam Long id) {
log.info("UserController fetchById param : {}", id);
if( id==null ) {
return Mono.just( ApiSingleResult.error("Illegal Argument") );
}

Mono<User> userMono = userService.fetchById(id);
Mono<ApiSingleResult<User>> resultMono = userMono.map(user -> ApiSingleResult.success(user))
.doOnSuccess(res -> log.info("UserController fetchById res: {}", JSON.toJSONString(res)))
.onErrorResume(ex -> {
log.error("UserController fetchById happen ex: ", ex);
return Mono.just(ApiSingleResult.error(ex.getMessage()));
});
return resultMono;
}

/**
* 分页查询
* @param pageNo
* @param pageSize
* @return
*/
@GetMapping("/pageQuery")
public Mono<ApiMultiResult<User>> pageQuery(@RequestParam Integer pageNo, @RequestParam Integer pageSize) {
log.info("UserController pageQuery param : {} | {}", pageNo, pageSize);
pageNo = pageNo <1 ? 1 : pageNo;
pageSize = pageSize<1 ? 50 : pageSize;

Mono<ApiMultiResult<User>> apiMultiResultMono = userService.pageQuery(pageNo, pageSize)
.doOnSuccess(res -> log.info("UserController pageQuery res: {}", JSON.toJSONString(res)))
.onErrorResume(ex -> {
log.error("UserController pageQuery happen ex: ", ex);
return Mono.just(ApiMultiResult.error(ex.getMessage()));
});
return apiMultiResultMono;
}

/**
* 更新备注
* @param id
* @param remark
* @return
*/
@GetMapping("/updateRemark")
public Mono<ApiSingleResult<User>> updateRemark(@RequestParam Long id, @RequestParam String remark) {
log.info("UserController updateRemark param : {} | {}", id, remark);
if( id==null ) {
return Mono.just( ApiSingleResult.error("Illegal Argument") );
}

Mono<User> userMono = userService.updateRemark(id, remark);
Mono<ApiSingleResult<User>> resultMono = userMono.map(user -> ApiSingleResult.success(user))
.doOnSuccess(res -> log.info("UserController updateRemark res: {}", JSON.toJSONString(res)))
.onErrorResume(ex -> {
log.error("UserController updateRemark happen ex : ", ex);
return Mono.just(ApiSingleResult.error(ex.getMessage()));
});
return resultMono;
}

/**
* 删除用户
* @param id
* @return
*/
@GetMapping("/delete")
public Mono<ApiSingleResult<Integer>> deleteUser(@RequestParam Long id) {
log.info("UserController deleteUser param : {}", id);
if( id==null ) {
return Mono.just( ApiSingleResult.error("Illegal Argument") );
}

Mono<Integer> intMono = userService.deleteUser(id);
Mono<ApiSingleResult<Integer>> resultMono = intMono.map(count -> ApiSingleResult.success(count))
.doOnSuccess(res -> log.info("UserController deleteUser res : {}", JSON.toJSONString(res)))
.onErrorResume(ex -> {
log.error("UserController deleteUser happen ex : ", ex);
return Mono.just(ApiSingleResult.error(ex.getMessage()));
});

return resultMono;
}

/**
* 流式获取全部用户
* @return
*/
@GetMapping(value = "/findAllByStream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ApiStreamResult<User>> findAllByStream() {
Flux<User> userFlux = userService.findAll();

// 将 每个User 包装为 data数据事件
Flux<ApiStreamResult<User>> dataEvents = userFlux.map(user -> ApiStreamResult.data(user))
.delayElements( Duration.ofSeconds(2) );

// 创建 start开始事件、end结束事件
Flux<ApiStreamResult<User>> startEvent = Flux.just(ApiStreamResult.start());
Flux<ApiStreamResult<User>> endEvent = Flux.just(ApiStreamResult.end());

Flux<ApiStreamResult<User>> resultFlux = Flux.concat(startEvent, dataEvents, endEvent)
.doOnNext(userApiStreamResult -> log.info("UserController findAllByStream send event: [{}] -->> {}", LocalTime.now(), JSON.toJSONString(userApiStreamResult)))
.onErrorResume(ex -> {
log.error("UserController findAllByStream happen ex: ", ex);
// 发送 error错误事件
return Flux.just( ApiStreamResult.error(ex.getMessage()) );
});

return resultFlux;
}
}

用户Service实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package com.aaron.reactive.service;

import java.util.List;
import com.alibaba.fastjson2.JSON;
import com.aaron.reactive.common.ApiMultiResult;
import com.aaron.reactive.dto.CreateUserDto;
import com.aaron.reactive.entity.User;
import com.aaron.reactive.repository.UserRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;


@Slf4j
@Service
public class UserService {
@Autowired
private UserRepository userRepository;

public Mono<User> createUser(CreateUserDto createUserDto) {
log.info("UserService createUser param: {}", JSON.toJSONString(createUserDto));
User user = new User();
BeanUtils.copyProperties(createUserDto, user);
Mono<User> userMono = userRepository.save(user)
.doOnSuccess(savedUser -> log.info("UserService createUser savedUser: {}", JSON.toJSONString(savedUser)) )
.doOnError( ex -> log.error("UserService createUser happen ex: ", ex) );
return userMono;
}

public Mono<User> fetchById(Long id) {
log.info("UserService fetchById param: {}", id);
Mono<User> userMono = userRepository.fetchById(id)
.doOnSuccess(user -> log.info("UserService fetchById user: {}", JSON.toJSONString(user)) )
.doOnError( ex -> log.error("UserService fetchById happen ex: ", ex) );
return userMono;
}

public Mono<ApiMultiResult<User>> pageQuery(Integer pageNo, Integer pageSize) {
log.info("UserService pageQuery param: {} | {}", pageNo, pageSize);

// 查询total
Mono<Long> totalMono = userRepository.count4PageQuery()
.doOnSuccess( total -> log.info("UserService pageQuery total: {}", total))
.doOnError( ex -> log.error("UserService pageQuery total happen ex: ", ex));

// 查询list
Pageable pageable = PageRequest.of(pageNo-1, pageSize);
Mono<List<User>> userFlux = userRepository.pageQuery(pageable)
.collectList()
.doOnSuccess(users -> log.info("UserService pageQuery users: {}", JSON.toJSONString(users)))
.doOnError( ex -> log.error("UserService pageQuery users happen ex: ", ex));

Mono<ApiMultiResult<User>> apiMultiResultMono = Mono.zip(totalMono, userFlux)
.map(tuple -> {
Long total = tuple.getT1();
List<User> userList = tuple.getT2();
return ApiMultiResult.success(total, userList);
}).doOnSuccess( userApiMultiResult -> log.info("UserService pageQuery res: {}", JSON.toJSONString(userApiMultiResult)))
.doOnError(ex -> log.error("UserService happen ex: ", ex));

return apiMultiResultMono;
}

public Mono<User> updateRemark(Long id, String remark) {
log.info("UserService updateRemark param: {} | {}", id, remark);
Mono<User> userMono = userRepository.fetchById(id)
.doOnNext( user -> log.info("UserService updateRemark find user: {}", JSON.toJSONString(user)) )
.switchIfEmpty( Mono.error(new RuntimeException("User No Found")) )
.flatMap( user -> userRepository.updateRemark(id, remark, user.getVersion()) )
.flatMap( count-> {
log.info("UserService updateRemark count : {}", count);
if( count>0 ) {
return userRepository.fetchById(id);
} else {
return Mono.error( new RuntimeException("Update Fail, Please Retry") );
}
}).doOnSuccess( updateUser -> log.info("UserService updateRemark updated User: {}", JSON.toJSONString(updateUser)))
.doOnError( ex -> log.error("UserService updateRemark happen ex: ", ex) );
return userMono;
}

public Mono<Integer> deleteUser(Long id) {
log.info("UserService deleteUser param: {}", id);
Mono<Integer> resMono = userRepository.fetchById(id)
.doOnNext( user -> log.info("UserService deleteUser find user: {}", JSON.toJSONString(user)) )
.switchIfEmpty(Mono.error(new RuntimeException("User No Found")))
.flatMap(user -> userRepository.delete(id, user.getVersion()))
.doOnSuccess( count -> log.info("UserService deleteUser res: {}", count))
.doOnError( ex -> log.error("UserService deleteUser happen ex: ", ex) );
return resMono;
}

public Flux<User> findAll() {
Flux<User> userFlux = userRepository.findAll()
.doOnNext(user -> log.info("UserService findAllByStream user: {}", JSON.toJSONString(user)))
.doOnError(ex -> log.error("UserService findAllByStream happen ex: ", ex));
return userFlux;
}
}

UT

针对用户的单元测试UT实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package com.aaron.reactive.controller;

import com.alibaba.fastjson2.JSON;
import com.aaron.reactive.ReactiveApplication;
import com.aaron.reactive.common.ApiMultiResult;
import com.aaron.reactive.common.ApiSingleResult;
import com.aaron.reactive.common.ApiStreamResult;
import com.aaron.reactive.dto.CreateUserDto;
import com.aaron.reactive.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.EntityExchangeResult;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;


// 使用随机端口启动Web服务: classes 指定 主程序启动类, webEnvironment 指定使用随机端口启动
@SpringBootTest(
classes = {ReactiveApplication.class},
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT
)
@Slf4j
public class UserControllerTest {
private final static String URL_PREFIX = "/user";

@Autowired
private WebTestClient webTestClient;

@Test
public void testCreateUser() {
CreateUserDto createUserDto = CreateUserDto.builder()
.name("王二麻子")
.sex("男")
.age(69)
.country("梵蒂冈")
.isVip(true)
.build();

EntityExchangeResult<ApiSingleResult<User>> entityExchangeResult = webTestClient
// 使用POST方法
.post()
// 设置url
.uri(URL_PREFIX + "/create")
// 设置内容类型
.contentType(MediaType.APPLICATION_JSON)
// 设置请求体Body
.bodyValue(createUserDto)
// 发起请求
.exchange()
// 断言: 期望 HTTP Status Code 200
.expectStatus().isOk()
// 指定反序列化的类型信息
.expectBody(new ParameterizedTypeReference<ApiSingleResult<User>>() {})
// 阻塞获取响应结果
.returnResult();

ApiSingleResult<User> apiSingleResult = entityExchangeResult.getResponseBody();
log.info("UserControllerTest testCreateUser apiSingleResult: {}", JSON.toJSONString(apiSingleResult));
}

@Test
public void testPageQuery() {
EntityExchangeResult<ApiMultiResult<User>> entityExchangeResult = webTestClient
// 使用GET方法
.get()
// 设置url、参数
.uri(uriBuilder -> uriBuilder.path(URL_PREFIX + "/pageQuery")
.queryParam("pageNo", 1)
.queryParam("pageSize", 3)
.build()
).exchange()
.expectStatus().isOk()
// 指定反序列化的类型信息
.expectBody(new ParameterizedTypeReference<ApiMultiResult<User>>() {})
// 阻塞获取响应结果
.returnResult();
ApiMultiResult<User> apiMultiResult = entityExchangeResult.getResponseBody();
log.info("UserControllerTest testPageQuery apiMultiResult: {}", JSON.toJSONString(apiMultiResult));
}

@Test
public void testFindAllByStream(){
Flux<ApiStreamResult<User>> apiStreamResultFlux = webTestClient
.get()
.uri(URL_PREFIX + "/findAllByStream")
// 指定接受的内容类型: text/event-stream
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.expectStatus().isOk()
// 断言响应的内容类型
.expectHeader().contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM)
.returnResult(new ParameterizedTypeReference<ApiStreamResult<User>>() {})
.getResponseBody();

StepVerifier.create(apiStreamResultFlux)
// 消费所有元素并打印日志
.thenConsumeWhile(
event -> true,
event -> log.info("UserControllerTest testFindAllByStream event: {}", JSON.toJSONString(event))
)
// 验证流正常结束
.verifyComplete();
}

}
请我喝杯咖啡捏~

欢迎关注我的微信公众号:青灯抽丝