0%

Spring WebFlux之创建/终端操作符

这里介绍Spring WebFlux中的创建、终端操作符

abstract.PNG

Marble Diagram 弹珠图

一个典型的Marble Diagram弹珠图由以下几个部分组成:

  • 时间轴:一个水平向右的箭头 表示 随着时间的流逝,数据流中的事件从左到右依次发生
  • Marble弹珠:时间轴上方的各种不同颜色的形状(圆点、方块、三角形等)用于表示数据流发出元素的onNext事件。每个形状均代表一个Data Item数据项
  • 垂直线(|):时间轴上的|垂直线 表示 数据流正常完成的onComplete事件
  • 叉号(x):时间轴上的x叉号 表示 数据流发生错误的onError事件
  • 操作符:一个矩形框。通常位于输入时间轴、输出时间轴之间。表示将上方的输入流转换为下方的输出流
  • 输入流:操作符上方的时间轴 表示 操作符接收的原始数据流
  • 输出流:操作符下方的时间轴 表示 操作符处理后产生的新数据流

下图是onErrorComplete操作符的Marble Diagram弹珠图

figure 1.jpg

创建操作符

Mono、Flux是响应式编程最基础的概念。具体地:Mono用于表示含 0个或1个元素 的响应式流;Flux用于表示含 0个或多个元素 的响应式流。且二者都不允许元素为null。为了方便可视化,辅助打印工具如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;


/**
* 辅助打印工具
*/
public class ReactorLogUtil {
private static Duration timeout = Duration.ofSeconds(60);

private static final String separator = "-".repeat(20);

public static <T> void log(String varName, Mono<T> mono) {
Object res = null;
try {
// block(): 阻塞获取
res = mono.block(timeout);
}catch (Exception ex) {
String msg = Optional.ofNullable(ex.getMessage())
.map(String::toLowerCase)
.orElse("");
if( msg.contains("timeout") ) {
res = "<Timeout>";
} else {
res = "<Throw "+ex.toString()+">";
}
}


String msg = new StringBuilder()
.append("<Mono> ")
.append(varName)
.append(" : ")
.append( Optional.ofNullable(res).orElse("<Empty Mono>") )
.toString();
System.out.println(msg);
}

public static <T> void log(String varName, Flux<T> flux) {
String res = null;
try{
Mono<List<T>> listMono = flux.collectList();
// block(): 阻塞获取
List<T> list = listMono.block(timeout);
res = list.toString();
}catch (Exception ex) {
if( ex.getMessage().toLowerCase().contains("timeout") ) {
res = "<Timeout>";
} else {
res = "<Throw "+ex.toString()+">";
}
}
String msg = new StringBuilder()
.append("<Flux> ")
.append(varName)
.append(" : ")
.append( res )
.toString();
System.out.println(msg);
}

public static void separator(String title) {
System.out.println("\n"+separator+" "+title+" "+separator);
}
}

just()

创建指定元素的Mono/Flux

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) {
ReactorLogUtil.separator("just()");
Mono<String> monoJ1 = Mono.just("Aaron");
Flux<String> fluxJ1 = Flux.just("Tony");
Flux<String> fluxJ2 = Flux.just("Cat", "Dog", "Human");
Flux<String> fluxJ3 = Flux.just();
ReactorLogUtil.log("monoJ1", monoJ1);
ReactorLogUtil.log("fluxJ1", fluxJ1);
ReactorLogUtil.log("fluxJ2", fluxJ2);
ReactorLogUtil.log("fluxJ3", fluxJ3);
}

输出如下

1
2
3
4
5
-------------------- just() --------------------
<Mono> monoJ1 : Aaron
<Flux> fluxJ1 : [Tony]
<Flux> fluxJ2 : [Cat, Dog, Human]
<Flux> fluxJ3 : []

defer()/fromSupplier()

当Mono/Flux真正被需要、订阅时,Supplier的get()方法才会被调用,即真正需要时才创建Mono/Flux

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) throws InterruptedException {
ReactorLogUtil.separator("defer() / fromSupplier()");
Mono<LocalTime> mt1 = Mono.just(LocalTime.now());
Mono<LocalTime> mt2 = Mono.defer( ()->Mono.just(LocalTime.now()) );
Mono<LocalTime> mt3 = Mono.fromSupplier(() -> LocalTime.now());

Flux<LocalTime> ft1 = Flux.just(LocalTime.now());
Flux<LocalTime> ft2 = Flux.defer( ()->Flux.just(LocalTime.now()) );
Thread.sleep(5*1000);

ReactorLogUtil.log("mt1", mt1);
ReactorLogUtil.log("mt2", mt2);
ReactorLogUtil.log("mt3", mt3);

System.out.println("\n"+"=".repeat(50)+"\n");

ReactorLogUtil.log("ft1", ft1);
ReactorLogUtil.log("ft2", ft2);
}

输出如下

1
2
3
4
5
6
7
8
9
-------------------- defer() / fromSupplier() --------------------
<Mono> mt1 : 14:03:14.174406
<Mono> mt2 : 14:03:19.455076
<Mono> mt3 : 14:03:19.464879

==================================================

<Flux> ft1 : [14:03:14.358709]
<Flux> ft2 : [14:03:19.466663]

empty()

创建空Mono/Flux(0个元素)

1
2
3
4
5
6
7
public static void main(String[] args) throws InterruptedException {
ReactorLogUtil.separator("empty()");
Mono<?> monoE1 = Mono.empty();
Flux<?> fluxE1 = Flux.empty();
ReactorLogUtil.log("monoE1",monoE1 );
ReactorLogUtil.log("fluxE1", fluxE1);
}

输出如下

1
2
3
-------------------- empty() --------------------
<Mono> monoE1 : <Empty Mono>
<Flux> fluxE1 : []

justOrEmpty()

如果data不为null,则创建包含该元素的Mono;否则,创建空Mono

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("justOrEmpty()");
Mono<Integer> mje1 = Mono.justOrEmpty(null);
Mono<Integer> mje2 = Mono.justOrEmpty(69);
Mono<Integer> mje3 = Mono.justOrEmpty( Optional.ofNullable(null) );
Mono<Integer> mje4 = Mono.justOrEmpty( Optional.of(119) );

ReactorLogUtil.log( "mje1", mje1 );
ReactorLogUtil.log( "mje2", mje2 );
ReactorLogUtil.log( "mje3", mje3 );
ReactorLogUtil.log( "mje4", mje4 );
}

输出如下

1
2
3
4
5
-------------------- justOrEmpty() --------------------
<Mono> mje1 : <Empty Mono>
<Mono> mje2 : 69
<Mono> mje3 : <Empty Mono>
<Mono> mje4 : 119

error()

创建不包含任何元素,直接发出onError信号的Mono/Flux

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("error()");
Mono<Object> me1 = Mono.error(new RuntimeException("服务异常"));
Mono<Object> me2 = Mono.error(() -> new IllegalArgumentException("缺少主键"));
Flux<Object> fe1 = Flux.error( new RuntimeException("资源有限") );
Flux<Object> fe2 = Flux.error(() -> new IllegalArgumentException("缺少名称"));

ReactorLogUtil.log("me1", me1);
ReactorLogUtil.log("me2", me2);
ReactorLogUtil.log("fe1", fe1);
ReactorLogUtil.log("fe2", fe2);
}

输出如下

1
2
3
4
5
-------------------- error() --------------------
<Mono> me1 : <Throw java.lang.RuntimeException: 服务异常>
<Mono> me2 : <Throw java.lang.IllegalArgumentException: 缺少主键>
<Flux> fe1 : <Throw java.lang.RuntimeException: 资源有限>
<Flux> fe2 : <Throw java.lang.IllegalArgumentException: 缺少名称>

never()

创建永远不会发出任何信号、元素的Mono/Flux。通常用于测试超时逻辑

1
2
3
4
5
6
7
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("never()");
Mono<Object> mn1 = Mono.never();
Flux<Object> fn1 = Flux.never();
ReactorLogUtil.log("mn1", mn1);
ReactorLogUtil.log("fn1", fn1);
}

输出如下

1
2
3
-------------------- never() --------------------
<Mono> mn1 : <Timeout>
<Flux> fn1 : <Timeout>

fromArray()/fromIterable()/fromStream()

从 数组/集合/Stream流 创建 Flux

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("fromXxx()");

// fromArray(): 从数组创建Flux
String[] names = {"张三", "李四", "王二麻子"};
Flux<String> ff1 = Flux.fromArray(names);
ReactorLogUtil.log("ff1", ff1);

// fromIterable(): 从集合创建Flux
List<Integer> prices = List.of(37,18,22);
Flux<Integer> ff2 = Flux.fromIterable(prices);
Set<Integer> priceSet = Set.of(37,18,22);
Flux<Integer> ff3 = Flux.fromIterable(priceSet);
ReactorLogUtil.log("ff2", ff2);
ReactorLogUtil.log("ff3", ff3);

// fromStream(): 从Stream流创建Flux
Stream<String> strStream = Stream.of("Tina", "Lucy", "Kimi");
Flux<String> ff4 = Flux.fromStream(strStream);
ReactorLogUtil.log("ff4", ff4);
}

输出如下

1
2
3
4
5
-------------------- fromXxx() --------------------
<Flux> ff1 : [张三, 李四, 王二麻子]
<Flux> ff2 : [37, 18, 22]
<Flux> ff3 : [22, 37, 18]
<Flux> ff4 : [Tina, Lucy, Kimi]

range(start,count)

创建包含 [start, start+count) 范围下的count个元素的Flux

1
2
3
4
5
6
7
8
9
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("range()");
Flux<Integer> fr1 = Flux.range(10, 4);
Flux<Integer> fr2 = Flux.range(3, 0);
Flux<Integer> fr3 = Flux.range(5, 1);
ReactorLogUtil.log("fr1",fr1);
ReactorLogUtil.log("fr2",fr2);
ReactorLogUtil.log("fr3",fr3);
}

输出如下

1
2
3
4
-------------------- range() --------------------
<Flux> fr1 : [10, 11, 12, 13]
<Flux> fr2 : []
<Flux> fr3 : [5]

interval()

创建无限流Flux。其按指定间隔发出从0开始递增的元素

1
2
3
4
5
6
7
8
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("interval()");
Flux<Long> fi1 = Flux.interval(Duration.ofSeconds(2))
// 最多取4个元素
.take(4)
.doOnNext(v-> System.out.println(LocalTime.now() + " -->> "+v));
ReactorLogUtil.log("fi1",fi1);
}

输出如下

1
2
3
4
5
6
-------------------- interval() --------------------
14:25:13.099309 -->> 0
14:25:15.097910 -->> 1
14:25:17.097782 -->> 2
14:25:19.096968 -->> 3
<Flux> fi1 : [0, 1, 2, 3]

generate()

创建按指定逻辑生成、发出元素的Flux

generate(generator)版本:直接使用SynchronousSink的next()方法每次发出一个指定元素

1
2
3
4
5
6
7
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("generate()");
Flux<Double> fg1 = Flux.generate( (SynchronousSink<Double> sink) -> sink.next( Math.random() ) )
// 最多取3个元素
.take(3);
ReactorLogUtil.log("fg1", fg1);
}

输出如下

1
2
-------------------- generate() --------------------
<Flux> fg1 : [0.12924511843509945, 0.3527570726338465, 0.21543168691524528]

generate(stateSupplier, generator)版本:该版本支持状态管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("generate()");

// 创建生成斐波那契数列的Flux
Flux<Long> fg2 = Flux.generate(
// 初始状态
() -> new long[]{0,1},
(long[] currentState, SynchronousSink<Long> sink) -> {
// 发出当前状态的第一个值
sink.next( currentState[0] );
// 计算下一个状态
long newFirst = currentState[1];
long newSecond = currentState[0]+currentState[1];
long[] newState = new long[]{newFirst, newSecond};
return newState;
}
).take(10);
ReactorLogUtil.log("fg2", fg2);
}

输出如下

1
2
-------------------- generate() --------------------
<Flux> fg2 : [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]

delay()

创建一个Mono,其会在延迟指定时间后发出一个值为0的元素。常用作延时信号

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("delay()");

System.out.println(LocalTime.now() + ": pre");
Mono<Long> m1 = Mono.delay(Duration.ofSeconds(20));
System.out.println(LocalTime.now() + ": wait");

ReactorLogUtil.log("m1", m1);
System.out.println(LocalTime.now() + ": complete");
}

输出如下

1
2
3
4
5
-------------------- delay() --------------------
14:39:23.458566: pre
14:39:23.645992: wait
<Mono> m1 : 0
14:39:43.672555: complete

终端操作符

中间操作符都是惰性的。只有终端操作符才会触发整个响应式流的执行。订阅信号会从消费者反向传播到源头的发布者,启动数据流动

subscribe()

subscribe(): 订阅流并启动它,但不对流发出的任何元素、错误、完成信号进行显式处理

1
2
3
4
5
6
7
8
9
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("subscribe()");

Flux.just(37,25)
.doOnNext(num-> System.out.println("onNext: " + num))
.subscribe();

Thread.sleep(1000*3);
}

输出如下

1
2
3
-------------------- subscribe() --------------------
onNext: 37
onNext: 25

subscribe(Consumer<? super T> consumer): 订阅流并启动它。使用指定的consumer对流发出的元素进行处理。由于没有提供错误消费者,故如果流发出错误,则错误会被传播到Reactor的全局错误处理机制

1
2
3
4
5
6
7
8
9
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("subscribe()");

Flux.just("18","Test","24")
.map( Integer::parseInt )
.subscribe( num-> System.out.println("num: "+num) );

Thread.sleep(1000*3);
}

输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-------------------- subscribe() --------------------
num: 18
15:16:41.404 [main] ERROR reactor.core.publisher.Operators -- Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NumberFormatException: For input string: "Test"
Caused by: java.lang.NumberFormatException: For input string: "Test"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:67)
at java.base/java.lang.Integer.parseInt(Integer.java:668)
at java.base/java.lang.Integer.parseInt(Integer.java:786)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:171)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:119)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
...

subscribe(consumer, errorConsumer): 订阅流并启动它。使用consumer、errorConsumer分别对发出的元素、错误进行处理

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("subscribe()");

Flux.just("18","Test","24")
.map( Integer::parseInt )
.subscribe(
v -> System.out.println("num: "+v),
e -> System.out.println("happen e: "+e)
);

Thread.sleep(1000*3);
}

输出如下

1
2
3
-------------------- subscribe() --------------------
num: 18
happen e: java.lang.NumberFormatException: For input string: "Test"

subscribe(consumer, errorConsumer, completeConsumer): 订阅流并启动它。使用consumer、errorConsumer、completeConsumer分别对发出的元素、错误、onComplete信号进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("subscribe()");

Flux.just("18","69","24")
.map( Integer::parseInt )
.subscribe(
v -> System.out.println("num: "+v),
e -> System.out.println("happen e: "+e),
() -> System.out.println("done")
);

Thread.sleep(1000*3);
}

输出如下

1
2
3
4
5
-------------------- subscribe() --------------------
num: 18
num: 69
num: 24
done

可通过subscribe()返回的Disposable对象的dispose方法主动取消订阅

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("subscribe()");

Disposable subscribe = Flux.interval(Duration.ofSeconds(1))
.log("#1")
.subscribe(v -> System.out.println("v: " + v));
Thread.sleep(1000*5);

// 取消订阅
subscribe.dispose();
Thread.sleep(1000*5);
}

输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-------------------- subscribe() --------------------
15:20:38.986 [main] INFO #1 -- onSubscribe(FluxInterval.IntervalRunnable)
15:20:38.990 [main] INFO #1 -- request(unbounded)
15:20:40.000 [parallel-1] INFO #1 -- onNext(0)
v: 0
15:20:40.997 [parallel-1] INFO #1 -- onNext(1)
v: 1
15:20:41.997 [parallel-1] INFO #1 -- onNext(2)
v: 2
15:20:42.994 [parallel-1] INFO #1 -- onNext(3)
v: 3
15:20:43.995 [parallel-1] INFO #1 -- onNext(4)
v: 4
15:20:44.004 [main] INFO #1 -- cancel()

block()

阻塞当前线程来获取Mono的结果。生产中强烈不推荐使用该方式获取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("block()");
String name1 = Mono.just("Aaron")
.block();
System.out.println("name1 : " + name1);

// 空Mono会返回null
Object name2 = Mono.empty()
.block();
System.out.println("name2 : " + name2);

// 出现错误的Mono会抛出相应异常
Object name3 = null;
try{
name3 = Mono.error(new IllegalArgumentException("Error Name"))
.block();
}catch (Exception ex) {
System.out.println("name3 Happen ex: " + ex);
}
}

输出如下

1
2
3
4
-------------------- block() --------------------
name1 : Aaron
name2 : null
name3 Happen ex: java.lang.IllegalArgumentException: Error Name

blockFirst()/blockLast()

阻塞当前线程来获取Flux的第一个元素/最后一个元素。生产中强烈不推荐使用该方式获取数据

1
2
3
4
5
6
7
8
9
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("blockFirst()/blockLast()");
Integer num1 = Flux.just(11,22,33)
.blockFirst();
Integer num2 = Flux.just(11, 22, 33)
.blockLast();
System.out.println("num1: " + num1);
System.out.println("num2: " + num2);
}

输出如下

1
2
3
-------------------- blockFirst()/blockLast() --------------------
num1: 11
num2: 33

toStream()

将Flux转为Java Stream。toStream()本身并不会立即启动数据流,其依然是惰性的。只有当Stream的终端操作被调用时,整个流才会被启动,同时会阻塞当前线程。故生产中应尽量避免使用该方法

1
2
3
4
5
6
7
8
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("toStream()");
List<Integer> nums = Flux.just(18, 35, 24, 69, 14, 6, 65)
.toStream()
.filter(num -> num > 30)
.toList();
System.out.println("nums: " + nums);
}

输出如下

1
2
-------------------- toStream() --------------------
nums: [35, 69, 65]

toIterable()

将Flux转为Java Iterable。toIterable()本身并不会立即启动数据流,其依然是惰性的。只有对返回的Iterable开始迭代时,整个流才会被启动,同时会阻塞当前线程。故生产中应尽量避免使用该方法

1
2
3
4
5
6
7
8
9
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("toIterable()");
Iterable<Integer> iterable = Flux.just(18, 35, 24)
.log("#1")
.toIterable();

System.out.println("准备开始迭代...");
iterable.forEach( num -> System.out.println("num: " + num));
}

输出如下

1
2
3
4
5
6
7
8
9
10
11
-------------------- toIterable() --------------------
准备开始迭代...
15:25:49.381 [main] INFO #1 -- | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
15:25:49.384 [main] INFO #1 -- | request(256)
15:25:49.385 [main] INFO #1 -- | onNext(18)
15:25:49.385 [main] INFO #1 -- | onNext(35)
15:25:49.386 [main] INFO #1 -- | onNext(24)
15:25:49.386 [main] INFO #1 -- | onComplete()
num: 18
num: 35
num: 24

toFuture()

将Mono转为Java CompletableFuture。其会立即启动数据流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("toFuture()");

Mono<Integer> ageMono = Mono.just(69)
.delayElement(Duration.ofSeconds(2))
.log();

System.out.println(LocalTime.now()+ " #1");
CompletableFuture<Integer> future1 = ageMono.toFuture();
System.out.println(LocalTime.now()+ " #2");
future1.thenAccept( age-> System.out.println("age: "+age) );
Thread.sleep(1000*5);

Mono<Integer> m1 = Mono.just("Aaron")
.map(Integer::parseInt);
CompletableFuture<Integer> future2 = m1.toFuture();
future2.thenAccept( num-> System.out.println("num: "+num) )
.exceptionally( e-> {
System.out.println("happen ex: "+e.getMessage());
return null;
});
Thread.sleep(1000*3);
}

输出如下

1
2
3
4
5
6
7
8
9
-------------------- toFuture() --------------------
15:30:03.500899 #1
15:30:03.523 [main] INFO reactor.Mono.DelayElement.1 -- onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)
15:30:03.528 [main] INFO reactor.Mono.DelayElement.1 -- request(unbounded)
15:30:03.529683 #2
15:30:05.532 [parallel-1] INFO reactor.Mono.DelayElement.1 -- onNext(69)
age: 69
15:30:05.533 [parallel-1] INFO reactor.Mono.DelayElement.1 -- onComplete()
happen ex: java.lang.NumberFormatException: For input string: "Aaron"
请我喝杯咖啡捏~

欢迎关注我的微信公众号:青灯抽丝