0%

Spring WebFlux之组合/延迟操作符

这里介绍Spring WebFlux中的组合/延迟操作符

abstract.PNG

打印工具

为了方便可视化,辅助打印工具如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
* 辅助打印工具
*/
public class ReactorLogUtil {
private static Duration timeout = Duration.ofSeconds(60);

private static final String separator = "-".repeat(20);

public static <T> void log(String varName, Mono<T> mono) {
Object res = null;
try {
// block(): 阻塞获取
res = mono.block(timeout);
}catch (Exception ex) {
String msg = Optional.ofNullable(ex.getMessage())
.map(String::toLowerCase)
.orElse("");
if( msg.contains("timeout") ) {
res = "<Timeout>";
} else {
res = "<Throw "+ex.toString()+">";
}
}


String msg = new StringBuilder()
.append("<Mono> ")
.append(varName)
.append(" : ")
.append( Optional.ofNullable(res).orElse("<Empty Mono>") )
.toString();
System.out.println(msg);
}

public static <T> void log(String varName, Flux<T> flux) {
String res = null;
try{
Mono<List<T>> listMono = flux.collectList();
// block(): 阻塞获取
List<T> list = listMono.block(timeout);
res = list.toString();
}catch (Exception ex) {
if( ex.getMessage().toLowerCase().contains("timeout") ) {
res = "<Timeout>";
} else {
res = "<Throw "+ex.toString()+">";
}
}
String msg = new StringBuilder()
.append("<Flux> ")
.append(varName)
.append(" : ")
.append( res )
.toString();
System.out.println(msg);
}

public static void separator(String title) {
System.out.println("\n"+separator+" "+title+" "+separator);
}
}

组合操作符

merge()/mergeWith()

将多个Mono/Flux中的元素合并到一个Flux。元素不保证顺序, 先到达的排在前面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("merge()/mergeWith()");

Mono<String> m1 = Mono.just("Tony")
.delayElement(Duration.ofMillis(400));
Mono<String> m2 = Mono.just("69")
.delayElement(Duration.ofMillis(100));

Flux<String> f1 = Flux.just("A","B","C")
.delayElements(Duration.ofMillis(120));
Flux<String> f2 = Flux.just("1", "2", "3")
.delayElements(Duration.ofMillis(100));

Flux<String> fm12 = m1.mergeWith(m2);
ReactorLogUtil.log("fm12", fm12);

Flux<?> fm1 = Flux.merge(f1, f2);
Flux<?> fm2 = f1.mergeWith(f2);
ReactorLogUtil.log("f12", fm1);
ReactorLogUtil.log("f12", fm2);
}

输出如下

1
2
3
4
-------------------- merge()/mergeWith() --------------------
<Flux> fm12 : [69, Tony]
<Flux> f12 : [1, A, 2, B, 3, C]
<Flux> f12 : [1, A, 2, B, 3, C]

concat()/concatWith()

将多个Mono/Flux中的元素合并到一个Flux中。元素顺序为指定的Mono/Flux拼接顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("concat()/concatWith()");

Mono<String> m1 = Mono.just("Tony")
.delayElement(Duration.ofMillis(400));
Mono<String> m2 = Mono.just("69")
.delayElement(Duration.ofMillis(100));
Flux<String> f1 = Flux.just("A","B","C")
.delayElements(Duration.ofMillis(120));
Flux<String> f2 = Flux.just("1", "2", "3")
.delayElements(Duration.ofMillis(100));

Flux<String> fc12 = m1.concatWith(m2);
ReactorLogUtil.log("fc12", fc12);

Flux<String> fc1 = Flux.concat(f1, f2);
Flux<String> fc2 = f1.concatWith(f2);
ReactorLogUtil.log("fc1", fc1);
ReactorLogUtil.log("fc2", fc2);
}

输出如下

1
2
3
4
-------------------- concat()/concatWith() --------------------
<Flux> fc12 : [Tony, 69]
<Flux> fc1 : [A, B, C, 1, 2, 3]
<Flux> fc2 : [A, B, C, 1, 2, 3]

zip()/zipWith()

将各Flux/Mono中对应索引位置的元素组合在一起。支持自定义组合逻辑, 默认组合为Tuple元组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("zip()/zipWith()");

Mono<String> m1 = Mono.just("Tony")
.delayElement(Duration.ofMillis(400));
Mono<String> m2 = Mono.just("69")
.delayElement(Duration.ofMillis(100));
Flux<String> f1 = Flux.just("A","B","C")
.delayElements(Duration.ofMillis(120));
Flux<String> f2 = Flux.just("1", "2", "3")
.delayElements(Duration.ofMillis(100));

Mono<Tuple2<String, String>> mz1 = Mono.zip(m1, m2);
Mono<String> mz2 = Mono.zip(m1, m2, (name, age)->name.toUpperCase()+" is "+age);
ReactorLogUtil.log("mz1", mz1);
ReactorLogUtil.log("mz2", mz2);

Flux<Tuple2<String, String>> fz1 = Flux.zip(f1, f2);
Flux<String> fz2 = f1.zipWith(f2, (f1e, f2e) -> f1e.toLowerCase() + f2e);
ReactorLogUtil.log("fz1", fz1);
ReactorLogUtil.log("fz2", fz2);
}

输出如下

1
2
3
4
5
-------------------- zip()/zipWith() --------------------
<Mono> mz1 : [Tony,69]
<Mono> mz2 : TONY is 69
<Flux> fz1 : [[A,1], [B,2], [C,3]]
<Flux> fz2 : [a1, b2, c3]

defaultIfEmpty()

上游流为空时,返回默认值

1
2
3
4
5
6
7
8
9
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("defaultIfEmpty()");
Flux<String> fd1 = Flux.just("Aaron", "Bob")
.defaultIfEmpty("Tina");
Flux<?> fd2 = Flux.empty()
.defaultIfEmpty("Tina");
ReactorLogUtil.log("fd1", fd1);
ReactorLogUtil.log("fd2", fd2);
}

输出如下

1
2
3
-------------------- defaultIfEmpty() --------------------
<Flux> fd1 : [Aaron, Bob]
<Flux> fd2 : [Tina]

switchIfEmpty()

上游流为空时,切换到备用流

1
2
3
4
5
6
7
8
9
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("switchIfEmpty()");
Flux<String> fs1 = Flux.just("Aaron", "Bob")
.switchIfEmpty( Flux.just("11", "22") );
Flux<?> fs2 = Flux.empty()
.switchIfEmpty( Flux.just("11", "22") );
ReactorLogUtil.log("fs1", fs1);
ReactorLogUtil.log("fs2", fs2);
}

输出如下

1
2
3
-------------------- switchIfEmpty() --------------------
<Flux> fs1 : [Aaron, Bob]
<Flux> fs2 : [11, 22]

switchOnFirst()

根据第一个元素决定,是使用原始流还是备用流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("switchOnFirst()");
Flux<String> ff1 = Flux.just("Start", "Data1", "Data2", "End")
// Note:flux中包含第一个元素
.switchOnFirst((signal, flux) -> {
// 第一个元素符合预期,使用原始流
if( signal.hasValue() && "Start".equals(signal.get()) ){
return flux.map(String::toUpperCase);
} else {
// 第一个元素不符合预期,使用备用流
return Flux.just("Error1", "Error 2");
}
});

Flux<String> ff2 = Flux.just("Data1", "Data2", "End")
.switchOnFirst((signal, flux) -> {
if( signal.hasValue() && "Start".equals(signal.get()) ){
return flux.map(String::toUpperCase);
} else {
return Flux.just("Error1", "Error 2");
}
});

ReactorLogUtil.log("ff1", ff1);
ReactorLogUtil.log("ff2", ff2);
}

输出如下

1
2
3
-------------------- switchOnFirst() --------------------
<Flux> ff1 : [START, DATA1, DATA2, END]
<Flux> ff2 : [Error1, Error 2]

startWith()

在流的最前面添加指定元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("startWith()");

Flux<String> fst1 = Flux.just("11", "33", "55")
.startWith("Hello", "World");
Flux<String> fst2 = Flux.just("11", "33", "55")
.startWith( List.of("-1", "-2") );
Flux<String> fst3 = Flux.just("11", "33", "55")
.startWith( Flux.just("Aaron","Bob") );

ReactorLogUtil.log("fst1", fst1);
ReactorLogUtil.log("fst2", fst2);
ReactorLogUtil.log("fst3", fst3);
}

输出如下

1
2
3
4
-------------------- startWith() --------------------
<Flux> fst1 : [Hello, World, 11, 33, 55]
<Flux> fst2 : [-1, -2, 11, 33, 55]
<Flux> fst3 : [Aaron, Bob, 11, 33, 55]

then()

当收到上游的onComplete信号后,其会返回一个 空Mono 或 指定Mono; 如果上游出现了onError信号,其则会将该错误信号继续向下游传递。适用于不关心上游返回的数据,只关心是否正常完成的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("then()");

Mono<Void> ft1 = Flux.just("Bob", "Tina")
.map(String::toUpperCase)
.doOnNext( str-> System.out.println("ft1 : " + str) )
.then();
ReactorLogUtil.log("ft1", ft1);

Mono<Void> ft2 = Flux.just("23","Aaron")
.map( Integer::parseInt )
.doOnNext( num-> System.out.println("ft2 : " + num) )
.then();
ReactorLogUtil.log("ft2", ft2);

Mono<String> ft3 = Flux.just("11", "33")
.map( Integer::parseInt )
.doOnNext( num-> System.out.println("ft3 : " + num) )
.then(Mono.just("Success"));
ReactorLogUtil.log("ft3", ft3);

Mono<String> ft4 = Flux.just("23","Aaron")
.map( Integer::parseInt )
.doOnNext( num-> System.out.println("ft4 : " + num) )
.then(Mono.just("Success"));
ReactorLogUtil.log("ft4", ft4);
}

输出如下

1
2
3
4
5
6
7
8
9
10
11
-------------------- then() --------------------
ft1 : BOB
ft1 : TINA
<Mono> ft1 : <Empty Mono>
ft2 : 23
<Mono> ft2 : <Throw java.lang.NumberFormatException: For input string: "Aaron">
ft3 : 11
ft3 : 33
<Mono> ft3 : Success
ft4 : 23
<Mono> ft4 : <Throw java.lang.NumberFormatException: For input string: "Aaron">

thenEmpty()

当收到上游的onComplete信号后,其会执行另一个异步动作。且该异步动作最终返回的也是空Mono; 如果上游出现了onError信号,其则会将该错误信号继续向下游传递。适用于当上游正常结束后,需要执行其他任务的场景。例如:记录日志、发送通知等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("thenEmpty()");

Mono<Void> fte1 = Flux.just("22", "44")
.map(Integer::parseInt)
.doOnNext( num-> System.out.println("fte1 #1 : " + num) )
.thenEmpty(
Mono.just("Clean Resource")
.doOnNext(str -> System.out.println("fte1 #2 : " + str))
.then()
);
ReactorLogUtil.log("fte1", fte1);

Mono<Void> fte2 = Flux.just("33", "Aaron")
.map(Integer::parseInt)
.doOnNext( num-> System.out.println("fte2 #1 : " + num) )
.thenEmpty(
Mono.just("Clean Resource")
.doOnNext(str -> System.out.println("fte2 #2 : " + str))
.then()
);
ReactorLogUtil.log("fte2", fte2);
}

输出如下

1
2
3
4
5
6
7
-------------------- thenEmpty() --------------------
fte1 #1 : 22
fte1 #1 : 44
fte1 #2 : Clean Resource
<Mono> fte1 : <Empty Mono>
fte2 #1 : 33
<Mono> fte2 : <Throw java.lang.NumberFormatException: For input string: "Aaron">

thenMany()

当收到上游的onComplete信号后,其会执行另一个异步动作。且该异步动作最终返回的是一个Flux; 如果上游出现了onError信号,其则会将该错误信号继续向下游传递。适用于 当上游前置任务成功完成后,才可以进行下一步的业务动作。例如:前置的系统初始化、身份验证等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("thenMany()");

Flux<Integer> ftm1 = Mono.just("Aaron")
.map(String::toUpperCase)
.doOnSuccess(name -> System.out.println(name + ": System Init Done"))
.thenMany(Flux.just("17", "25", "32")
.map(Integer::parseInt)
);
ReactorLogUtil.log("ftm1", ftm1);

Flux<Integer> ftm2 = Mono.just("Aaron")
.map( Integer::parseInt )
.doOnSuccess(name -> System.out.println(name + ": System Init Done"))
.thenMany(Flux.just("69")
.map(Integer::parseInt)
);
ReactorLogUtil.log("ftm2", ftm2);
}

输出如下

1
2
3
4
-------------------- thenMany() --------------------
AARON: System Init Done
<Flux> ftm1 : [17, 25, 32]
<Flux> ftm2 : <Throw java.lang.NumberFormatException: For input string: "Aaron">

延迟操作符

delaySubscription()

上游所有操作符(含自身)按 指定时间/另一个发布者的信号 来延迟订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("delaySubscription()");

Flux.just("Tina")
.log("#1")
// 延迟指定时间后,delaySubscription及其上游的所有操作符才开始订阅
.delaySubscription( Duration.ofSeconds(4) )
.log("#2")
.subscribe();
Thread.sleep(1000*8);

System.out.println("\n-----------------------------------\n");

Mono.just("69")
.log("#A")
// 等待另一个发布者发出完成信号,delaySubscription及其上游的所有操作符才开始订阅
.delaySubscription(
Mono.just("S")
.delayElement(Duration.ofSeconds(5))
.log(" sub publisher")
).log("#B")
.subscribe();
Thread.sleep(1000*8);
}

输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
-------------------- delaySubscription() --------------------
16:08:14.308 [main] INFO #2 -- onSubscribe(FluxDelaySubscription.DelaySubscriptionOtherSubscriber)
16:08:14.314 [main] INFO #2 -- request(unbounded)
16:08:18.339 [parallel-1] INFO #1 -- | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
16:08:18.340 [parallel-1] INFO #1 -- | request(unbounded)
16:08:18.341 [parallel-1] INFO #1 -- | onNext(Tina)
16:08:18.341 [parallel-1] INFO #2 -- onNext(Tina)
16:08:18.342 [parallel-1] INFO #1 -- | onComplete()
16:08:18.342 [parallel-1] INFO #2 -- onComplete()

-----------------------------------

16:08:22.331 [main] INFO sub publisher -- onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)
16:08:22.331 [main] INFO #B -- onSubscribe(FluxDelaySubscription.DelaySubscriptionOtherSubscriber)
16:08:22.331 [main] INFO #B -- request(unbounded)
16:08:22.331 [main] INFO sub publisher -- request(unbounded)
16:08:27.337 [parallel-2] INFO sub publisher -- onNext(S)
16:08:27.341 [parallel-2] INFO sub publisher -- cancel()
16:08:27.342 [parallel-2] INFO #A -- | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
16:08:27.342 [parallel-2] INFO #A -- | request(unbounded)
16:08:27.342 [parallel-2] INFO #A -- | onNext(69)
16:08:27.342 [parallel-2] INFO #B -- onNext(69)
16:08:27.342 [parallel-2] INFO #A -- | onComplete()
16:08:27.343 [parallel-2] INFO #B -- onComplete()
16:08:27.343 [parallel-2] INFO sub publisher -- onComplete()

delayElement()/delayElements()

延迟Mono/Flux中每个元素(即onNext信号)的发出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("delayElement()/delayElements()");
Mono.just("Tony")
.log("delayElement #1")
.map( String::toUpperCase )
.log("delayElement #2")
// 每个元素延迟3秒后发出
.delayElement( Duration.ofSeconds(3) )
.log("delayElement #3")
.subscribe();
Thread.sleep(1000*5);

System.out.println("\n-----------------------------------\n");

Flux.just(11,22,33)
.log("delayElements #A")
.delayElements(Duration.ofSeconds(2))
.log("delayElements #B")
.subscribe();
Thread.sleep(1000*8);
}

输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
-------------------- delayElement()/delayElements() --------------------
16:10:57.582 [main] INFO delayElement #1 -- | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
16:10:57.586 [main] INFO delayElement #2 -- | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
16:10:57.587 [main] INFO delayElement #3 -- onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)
16:10:57.587 [main] INFO delayElement #3 -- request(unbounded)
16:10:57.587 [main] INFO delayElement #2 -- | request(unbounded)
16:10:57.588 [main] INFO delayElement #1 -- | request(unbounded)
16:10:57.589 [main] INFO delayElement #1 -- | onNext(Tony)
16:10:57.589 [main] INFO delayElement #2 -- | onNext(TONY)
16:10:57.590 [main] INFO delayElement #1 -- | onComplete()
16:10:57.591 [main] INFO delayElement #2 -- | onComplete()
16:11:00.597 [parallel-1] INFO delayElement #3 -- onNext(TONY)
16:11:00.599 [parallel-1] INFO delayElement #3 -- onComplete()

-----------------------------------

16:11:02.669 [main] INFO delayElements #A -- | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
16:11:02.669 [main] INFO delayElements #B -- onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber)
16:11:02.669 [main] INFO delayElements #B -- request(unbounded)
16:11:02.669 [main] INFO delayElements #A -- | request(1)
16:11:02.669 [main] INFO delayElements #A -- | onNext(11)
16:11:04.678 [parallel-2] INFO delayElements #B -- onNext(11)
16:11:04.679 [parallel-2] INFO delayElements #A -- | request(1)
16:11:04.679 [parallel-2] INFO delayElements #A -- | onNext(22)
16:11:06.684 [parallel-3] INFO delayElements #B -- onNext(22)
16:11:06.684 [parallel-3] INFO delayElements #A -- | request(1)
16:11:06.685 [parallel-3] INFO delayElements #A -- | onNext(33)
16:11:06.685 [parallel-3] INFO delayElements #A -- | onComplete()
16:11:08.690 [parallel-4] INFO delayElements #B -- onNext(33)
16:11:08.691 [parallel-4] INFO delayElements #B -- onComplete()

delaySequence()

仅延迟第一个元素的发出时间,后续元素的发出按元素原有的时间间隔。相当于, 将响应式流的所有元素在时间轴上向右平移指定的时长, 而元素之间原有的时间间隔保持不变

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("delaySequence()");

Flux.just(1,2,3)
.delayElements(Duration.ofSeconds(4))
.log("#1")
// 第一个元素延迟指定时间后发出,后续元素按原有间隔发出
.delaySequence( Duration.ofSeconds(10) )
.log("#2")
.subscribe();

Thread.sleep(1000*30);
}

输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
-------------------- delaySequence() --------------------
16:17:51.358 [main] INFO #1 -- onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber)
16:17:51.362 [main] INFO #2 -- onSubscribe(SerializedSubscriber)
16:17:51.363 [main] INFO #2 -- request(unbounded)
16:17:51.363 [main] INFO #1 -- request(unbounded)
16:17:55.377 [parallel-1] INFO #1 -- onNext(1)
16:17:59.389 [parallel-3] INFO #1 -- onNext(2)
16:18:03.396 [parallel-4] INFO #1 -- onNext(3)
16:18:03.400 [parallel-4] INFO #1 -- onComplete()
16:18:05.389 [parallel-2] INFO #2 -- onNext(1)
16:18:09.393 [parallel-2] INFO #2 -- onNext(2)
16:18:13.401 [parallel-2] INFO #2 -- onNext(3)
16:18:13.402 [parallel-2] INFO #2 -- onComplete()

delayUntil()

每个元素发出前,延迟等待一个异步操作完成

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("delayUntil()");

Flux.just(1,2,3)
.log("#1")
.delayUntil( num -> Mono.just("gg").delayElement(Duration.ofSeconds(num)))
.log("#2")
.subscribe();

Thread.sleep(1000*10);
}

输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
-------------------- delayUntil() --------------------
16:21:07.597 [main] INFO #1 -- | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
16:21:07.599 [main] INFO #2 -- onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber)
16:21:07.600 [main] INFO #2 -- request(unbounded)
16:21:07.600 [main] INFO #1 -- | request(1)
16:21:07.600 [main] INFO #1 -- | onNext(1)
16:21:08.654 [parallel-1] INFO #2 -- onNext(1)
16:21:08.655 [parallel-1] INFO #1 -- | request(1)
16:21:08.656 [parallel-1] INFO #1 -- | onNext(2)
16:21:10.661 [parallel-2] INFO #2 -- onNext(2)
16:21:10.661 [parallel-2] INFO #1 -- | request(1)
16:21:10.661 [parallel-2] INFO #1 -- | onNext(3)
16:21:10.662 [parallel-2] INFO #1 -- | onComplete()
16:21:13.667 [parallel-3] INFO #2 -- onNext(3)
16:21:13.667 [parallel-3] INFO #2 -- onComplete()
请我喝杯咖啡捏~

欢迎关注我的微信公众号:青灯抽丝