0%

Spring WebFlux之转换操作符

这里介绍Spring WebFlux中的转换操作符

abstract.PNG

打印工具

为了方便可视化,辅助打印工具如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
* 辅助打印工具
*/
public class ReactorLogUtil {
private static Duration timeout = Duration.ofSeconds(60);

private static final String separator = "-".repeat(20);

public static <T> void log(String varName, Mono<T> mono) {
Object res = null;
try {
// block(): 阻塞获取
res = mono.block(timeout);
}catch (Exception ex) {
String msg = Optional.ofNullable(ex.getMessage())
.map(String::toLowerCase)
.orElse("");
if( msg.contains("timeout") ) {
res = "<Timeout>";
} else {
res = "<Throw "+ex.toString()+">";
}
}


String msg = new StringBuilder()
.append("<Mono> ")
.append(varName)
.append(" : ")
.append( Optional.ofNullable(res).orElse("<Empty Mono>") )
.toString();
System.out.println(msg);
}

public static <T> void log(String varName, Flux<T> flux) {
String res = null;
try{
Mono<List<T>> listMono = flux.collectList();
// block(): 阻塞获取
List<T> list = listMono.block(timeout);
res = list.toString();
}catch (Exception ex) {
if( ex.getMessage().toLowerCase().contains("timeout") ) {
res = "<Timeout>";
} else {
res = "<Throw "+ex.toString()+">";
}
}
String msg = new StringBuilder()
.append("<Flux> ")
.append(varName)
.append(" : ")
.append( res )
.toString();
System.out.println(msg);
}

public static void separator(String title) {
System.out.println("\n"+separator+" "+title+" "+separator);
}
}

map()

将每个元素T通过一个函数串行的、同步的转换为一个新元素R。故该函数中不可以存在被阻塞的情形,例如:网络请求、数据库查询等

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("map()");

Mono<Integer> m1 = Mono.just("69")
.map(str -> Integer.valueOf(str));
ReactorLogUtil.log("m1", m1);

Flux<String> f1 = Flux.just("Tina", "David", "Bob")
.map(str->str.length()<=3 ? str.toLowerCase() : str.toUpperCase());
ReactorLogUtil.log("f1", f1);
}

输出如下

1
2
3
-------------------- map() --------------------
<Mono> m1 : 69
<Flux> f1 : [TINA, DAVID, bob]

flatMap()

将各元素T通过一个函数并行的映射为新的响应式流 Mono / Flux。这个过程是异步的,当响应式流返回结果时,flatMap内部会自动进行压平,然后将结果元素R推到下游。特别地:对Flux使用flatMap,如果响应式流返回的是一个空Mono/Flux,其会直接忽略。显然,由于是并行的,故无法保证 输出结果的顺序 与 输入元素的顺序 是一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("flatMap()");

Mono<String> m1 = Mono.just("David")
.flatMap(name -> getAge(name));
ReactorLogUtil.log("m1", m1);

Flux<String> f1 = Flux.just("Tina", "Aaron", "Bob", "Amy")
.flatMap(name -> getAge(name));
ReactorLogUtil.log("f1", f1);
}

private static Mono<String> getAge(String name){
System.out.println(LocalTime.now() + " getAge param: " + name);

if( "Amy".equals(name) ) {
return Mono.empty();
}

Integer age = null;
Long delayTime = null;
switch (name) {
case "Aaron" -> {
age=3;
delayTime = 3L;
}
case "Bob" -> {
age = 1;
delayTime = 1L;
}
case "Tina" -> {
age = 5;
delayTime = 5L;
}
default -> {
age = 0;
delayTime = 0L;
}
}
return Mono.just(name + ">>>" + age)
.delayElement( Duration.ofSeconds(delayTime) );
}

输出如下

1
2
3
4
5
6
7
8
-------------------- flatMap() --------------------
16:47:41.624932 getAge param: David
<Mono> m1 : David>>>0
16:47:41.653836 getAge param: Tina
16:47:41.654409 getAge param: Aaron
16:47:41.654605 getAge param: Bob
16:47:41.654708 getAge param: Amy
<Flux> f1 : [Bob>>>1, Aaron>>>3, Tina>>>5]

concatMap()

flatMap()的串行版本。可保证输出结果的顺序与输入元素的顺序一致。即拿到第一个元素T1的结果R1;再发起第二个元素T2的转换,直到拿到结果R2后,才发起第三个元素T3的转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("concatMap()");
Flux<String> f1 = Flux.just("Tina", "Amy", "Aaron", "Bob")
.concatMap(name -> getAge(name));
ReactorLogUtil.log("f1", f1);
}

private static Mono<String> getAge(String name){
System.out.println(LocalTime.now() + " getAge param: " + name);

if( "Amy".equals(name) ) {
return Mono.empty();
}

Integer age = null;
Long delayTime = null;
switch (name) {
case "Aaron" -> {
age=3;
delayTime = 3L;
}
case "Bob" -> {
age = 1;
delayTime = 1L;
}
case "Tina" -> {
age = 5;
delayTime = 5L;
}
default -> {
age = 0;
delayTime = 0L;
}
}
return Mono.just(name + ">>>" + age)
.delayElement( Duration.ofSeconds(delayTime) );
}

输出如下

1
2
3
4
5
6
-------------------- concatMap() --------------------
16:44:11.823970 getAge param: Tina
16:44:16.862611 getAge param: Amy
16:44:16.863213 getAge param: Aaron
16:44:19.868758 getAge param: Bob
<Flux> f1 : [Tina>>>5, Aaron>>>3, Bob>>>1]

flatMapSequential()

既具备flatMap()的优点:并发执行;又具备concatMap()的优点:按序输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("flatMapSequential()");
Flux<String> f1 = Flux.just("Tina","Amy", "Aaron", "Bob")
.flatMapSequential(name -> getAge(name));
ReactorLogUtil.log("f1", f1);
}

private static Mono<String> getAge(String name){
System.out.println(LocalTime.now() + " getAge param: " + name);

if( "Amy".equals(name) ) {
return Mono.empty();
}

Integer age = null;
Long delayTime = null;
switch (name) {
case "Aaron" -> {
age=3;
delayTime = 3L;
}
case "Bob" -> {
age = 1;
delayTime = 1L;
}
case "Tina" -> {
age = 5;
delayTime = 5L;
}
default -> {
age = 0;
delayTime = 0L;
}
}
return Mono.just(name + ">>>" + age)
.delayElement( Duration.ofSeconds(delayTime) );
}

输出如下

1
2
3
4
5
6
-------------------- flatMapSequential() --------------------
16:46:26.446800 getAge param: Tina
16:46:26.465326 getAge param: Amy
16:46:26.465597 getAge param: Aaron
16:46:26.465878 getAge param: Bob
<Flux> f1 : [Tina>>>5, Aaron>>>3, Bob>>>1]

cast()

将每个元素强制转换为指定类型,无法转换会发生ClassCastException错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("cast()");

Mono<Integer> mc1 = Mono.just(18)
.cast(Integer.class);
Mono<Double> mc2 = Mono.just(18)
.cast(Double.class);
ReactorLogUtil.log("mc1", mc1);
ReactorLogUtil.log("mc2", mc2);

Flux<Integer> fc1 = Flux.just(2,3)
.cast(Integer.class);
Flux<Long> fc2 = Flux.just(4,"car")
.cast(Long.class);
ReactorLogUtil.log("fc1", fc1);
ReactorLogUtil.log("fc2", fc2);
}

输出如下

1
2
3
4
5
-------------------- cast() --------------------
<Mono> mc1 : 18
<Mono> mc2 : <Throw java.lang.ClassCastException: Cannot cast java.lang.Integer to java.lang.Double>
<Flux> fc1 : [2, 3]
<Flux> fc2 : <Throw java.lang.ClassCastException: Cannot cast java.lang.Integer to java.lang.Long>

handle()

其既具备同步转换元素的能力,也可以过滤筛选元素。相当于map+filter的组合

例如,我们期望将字符串转换为整数,转换失败的请忽略。第1种实现方式,使用map进行转换, 转换失败时需返回null。由于onNext 信号传递的元素不能为null,故其会发出NPE的onError信号。换言之,通过map操作符无法实现 1到0(1->0) 的映射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("handle()");
// 需求:将字符串转换为整数,转换失败的请忽略
// 实现1: 使用map进行转换, 转换失败时返回null
Flux<Integer> f5a = Flux.just("18", "Aaron", "69")
.map(str -> {
Integer num = null;
try {
num = Integer.parseInt(str);
} catch (Exception ex) {
// 忽略转换失败
System.out.println("Str 2 Integer fail: "+ str);
}
return num;
});
ReactorLogUtil.log("f5a", f5a);
}

输出如下

1
2
3
-------------------- handle() --------------------
Str 2 Integer fail: Aaron
<Flux> f5a : <Throw java.lang.NullPointerException: The mapper [com.aaron.reactive.service.operator.OperatorDemo$$Lambda$25/0x0000000136082798] returned a null value.>

第2种实现方式:使用flatMap进行转换。转换失败时, 通过返回空Mono 来实现 1到0(1->0) 的映射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("handle()");
// 需求:将字符串转换为整数,转换失败的请忽略
// 实现2: 使用flatMap进行转换。转换失败时, 通过返回空Mono 来实现 1到0(1->0) 的映射
Flux<Integer> f5b = Flux.just("18", "Aaron", "69")
.flatMap(str -> {
Integer num = null;
try {
num = Integer.parseInt(str);
} catch (Exception ex) {
// 忽略转换失败
System.out.println("Str 2 Integer fail: " + str);
}
return Mono.justOrEmpty(num);
});
ReactorLogUtil.log("f5b", f5b);
}

输出如下

1
2
3
-------------------- handle() --------------------
Str 2 Integer fail: Aaron
<Flux> f5b : [18, 69]

第3种方式通过handle来实现。显然,使用它来实现上述需求 比上面的基于flatMap的第2种实现方式更合适,因为后者是一个较重的异步操作。其中,SynchronousSink 的常用方法:

  • next(): 发出一个元素到下游。反之,如果需要过滤掉该元素,什么都不做就可以了。值得注意的是,对于每一个上游流过来的元素, sink.next()最多只能被执行一次。即其支持的映射是 1 到 0或1 (1 -> 0或1)
  • error(): 发出错误以终止流
  • complete(): 正常终止流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) throws Exception{
ReactorLogUtil.separator("handle()");
// 需求:将字符串转换为整数,转换失败的请忽略
// 实现3: 基于handle
Flux<Object> f5c = Flux.just("18", "Aaron", "69")
.handle((str, sink) -> {
try {
Integer num = Integer.parseInt(str);
// 转换成功,发出到下游
sink.next(num);
} catch (Exception ex) {
// 转换失败, 直接忽略
System.out.println("Str 2 Integer fail: "+ str);
}
});
ReactorLogUtil.log("f5c", f5c);
}

输出如下

1
2
3
-------------------- handle() --------------------
Str 2 Integer fail: Aaron
<Flux> f5c : [18, 69]

transform()

可以将共用的处理逻辑(若干个操作符)抽取、封装为一个可复用的单元,然后通过该函数对响应流Mono/Flux应用该处理逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) throws Exception {
ReactorLogUtil.separator("transform()");
Function<Mono<String>, Publisher<Integer>> str2IntConvert = mono -> {
return mono.map( Integer::parseInt )
.map( Math::abs )
.doOnSuccess( num -> System.out.println("convert success res: "+num))
.doOnError( ex-> System.out.println("convert error: "+ex.getMessage()))
.onErrorComplete();
};
Mono<Integer> mt1 = Mono.just("-69")
.transform(str2IntConvert);
Mono<Integer> mt2 = Mono.just("Tina")
.transform(str2IntConvert);
ReactorLogUtil.log("mt1", mt1);
ReactorLogUtil.log("mt2", mt2);

Function<Flux<String>, Flux<String>> strConvert = flux -> {
return flux.filter( str -> str.length()>3 )
.map( String::toUpperCase );
};
Flux<String> ft1 = Flux.just("Bob", "Aaron", "Tim", "Jack", "Tony")
.transform(strConvert);
ReactorLogUtil.log("ft1", ft1);
}

输出如下

1
2
3
4
5
6
-------------------- transform() --------------------
convert success res: 69
<Mono> mt1 : 69
convert error: For input string: "Tina"
<Mono> mt2 : <Empty Mono>
<Flux> ft1 : [AARON, JACK, TONY]

transformDeferred()

transform操作符接收的Function在声明、装配就会被调用。故该function只会被调用一次。这样,后续所有订阅者使用的都是同一个操作符链。由于操作符链是共享的,故封装在transform内的逻辑必须是无状态的。否则,多个订阅者之间会出现状态污染。而transformDeferred操作符接收的Function每次只有在被订阅时才会被调用,体现出名字中Deferred的延迟含义。这样,每次订阅时都会为该部分操作链动态生成一个全新的、独立的实例。适用于transformDeferred中封装的逻辑是有状态的。典型的场景有:每个订阅者需要记录各自的重试次数。需要注意的是:只有transformDeferred接收的Function中所定义的那部分操作符链才是为每个订阅者单独创建的。而transformDeferred之外的的其他操作符,依然是所有订阅者共享的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) throws Exception {
ReactorLogUtil.separator("transformDeferred()");

Function<Flux<String>, Flux<String>> func = flux -> {
System.out.println("Func Called: 实例化retryCount");
AtomicInteger retryCount = new AtomicInteger(1);
return flux.map(str -> {
int count = retryCount.getAndIncrement();
return "Count->" + count + " Item->" + str;
});
};

Flux<String> fta = Flux.just("Bob", "Aaron", "Tim")
.transform( func );
System.out.println("Test transform");
ReactorLogUtil.log("第1次订阅: fta", fta);
ReactorLogUtil.log("第2次订阅: fta", fta);

Flux<String> ftb = Flux.just("Bob", "Aaron", "Tim")
.transformDeferred( func );
System.out.println("\nTest transformDeferred");
ReactorLogUtil.log("第1次订阅: ftb", ftb);
ReactorLogUtil.log("第2次订阅: ftb", ftb);
}

输出如下

1
2
3
4
5
6
7
8
9
10
11
-------------------- transformDeferred() --------------------
Func Called: 实例化retryCount
Test transform
<Flux> 第1次订阅: fta : [Count->1 Item->Bob, Count->2 Item->Aaron, Count->3 Item->Tim]
<Flux> 第2次订阅: fta : [Count->4 Item->Bob, Count->5 Item->Aaron, Count->6 Item->Tim]

Test transformDeferred
Func Called: 实例化retryCount
<Flux> 第1次订阅: ftb : [Count->1 Item->Bob, Count->2 Item->Aaron, Count->3 Item->Tim]
Func Called: 实例化retryCount
<Flux> 第2次订阅: ftb : [Count->1 Item->Bob, Count->2 Item->Aaron, Count->3 Item->Tim]

groupBy()

将Flux中的元素根据key分发到不同的GroupedFlux分组流中。可通过key()方法获取分组键,常用collectList()方法收集分组结果

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) throws Exception {
ReactorLogUtil.separator("groupBy()");
Flux<Entry<String, List<Integer>>> fg1 = Flux.just(3, 18, 21, 77, 33, 69, 24, 35)
.groupBy(num -> num % 2 == 0 ? "Even" : "Odd")
.flatMap( groupedFlux -> {
// 获取分组Key
String key = groupedFlux.key();
Mono<List<Integer>> listMono = groupedFlux.collectList();
Mono<Entry<String, List<Integer>>> entryMono = listMono.map(nums -> Map.entry(key, nums));
return entryMono;
} );
ReactorLogUtil.log("fg1", fg1);
}

输出如下

1
2
-------------------- groupBy() --------------------
<Flux> fg1 : [Even=[18, 24], Odd=[3, 21, 77, 33, 69, 35]]

window()

将Flux中的元素按一定规则(元素数量/时间间隔等)收集到不同的窗口中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) throws Exception {
ReactorLogUtil.separator("window()");

Flux<List<Integer>> fw1 = Flux.just(27, 32, 12, 99, 45, 31, 12, 75, 87, 56)
// 固定窗口: 每3个元素切分一次
.window(3)
.flatMap(numFlux -> numFlux.collectList());
ReactorLogUtil.log("fw1", fw1);

Flux<List<Integer>> fw2 = Flux.just(27, 32, 12, 99, 45, 31, 12)
.delayElements(Duration.ofMillis(30))
// 固定窗口: 每50ms切分一次
.window( Duration.ofMillis(50) )
.flatMap( numFlux -> numFlux.collectList() );
ReactorLogUtil.log("fw2", fw2);
}

输出如下

1
2
3
-------------------- window() --------------------
<Flux> fw1 : [[27, 32, 12], [99, 45, 31], [12, 75, 87], [56]]
<Flux> fw2 : [[27], [32], [12, 99], [45], [31, 12]]

window(int maxSize, int skip) 版本:maxSize参数表示每个窗口的最大元素数量; skip表示创建新窗口起始点的步长。即: 从当前窗口的起始点到下一个窗口的起始点需要前进多少个元素

具体地:

  • skip = maxSize: 每次创建新窗口时,上一个窗口都因为元素正好满了而关闭了。这样窗口一个接一个,不重叠、不遗漏。等同于window(maxSize)。即:固定窗口/滚动窗口
  • skip < maxSize: 步长 小于 窗口大小,导致窗口产生重叠。除第一个窗口外,每个新窗口的开头都会包含了前一个窗口末尾的(maxSize - skip)个元素。即:滑动窗口
  • skip > maxSize: 步长 大于 窗口大小,导致窗口之间产生间隙。连续的两个窗口中间会跳过、丢弃(skip-maxSize)个元素。即:跳跃窗口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) throws Exception {
ReactorLogUtil.separator("window()");

Flux<List<Integer>> fwa = Flux.range(1,9)
// 固定窗口
.window(5, 5)
.flatMap( numFlux -> numFlux.collectList() );

Flux<List<Integer>> fwb = Flux.range(1,9)
// 滑动窗口
.window(5,4)
.flatMap( numFlux -> numFlux.collectList() );

Flux<List<Integer>> fwc = Flux.range(1,12)
// 跳跃窗口
.window(3,5)
.flatMap( numFlux -> numFlux.collectList() );

ReactorLogUtil.log("fwa", fwa);
ReactorLogUtil.log("fwb", fwb);
ReactorLogUtil.log("fwc", fwc);
}

输出如下

1
2
3
4
-------------------- window() --------------------
<Flux> fwa : [[1, 2, 3, 4, 5], [6, 7, 8, 9]]
<Flux> fwb : [[1, 2, 3, 4, 5], [5, 6, 7, 8, 9], [9]]
<Flux> fwc : [[1, 2, 3], [6, 7, 8], [11, 12]]

windowUntil()/windowWhile()

windowUntil操作符:一直收集元素到一个窗口中,直到某个元素满足指定条件后; 此时,将该元素继续收集到当前窗口,随后立即关闭当前窗口。然后,立即打开一个新的窗口

windowWhile操作符:只要满足条件,就一直把元素收集到当前窗口。当某个元素不满足条件时,当前窗口会立即关闭。同时该元素会被丢失。值得注意的是,对于连续的不满足条件的元素,可能会产生多个空窗口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) throws Exception {
ReactorLogUtil.separator("windowUntil()/windowWhile()");
Flux<List<?>> fw4 = Flux.just(45, 42, "END","END", 18, 17, 12, "END", 22, 29, "END")
.windowUntil(event -> "END".equals(event))
.flatMap(eventFlux -> eventFlux.collectList());
ReactorLogUtil.log("fw4", fw4);

Flux<List<Integer>> fw5a = Flux.just(1, 24, 18, 21, 66, 3, 5, 34, 44, 40,13,15,17)
.windowWhile(num -> num % 2 == 0)
.flatMap(Flux::collectList);
ReactorLogUtil.log("fw5a", fw5a);

Flux<List<Integer>> fw5b = fw5a.filter(list -> !list.isEmpty());
ReactorLogUtil.log("fw5b", fw5b);
}

输出如下

1
2
3
4
-------------------- windowUntil()/windowWhile() --------------------
<Flux> fw4 : [[45, 42, END], [END], [18, 17, 12, END], [22, 29, END]]
<Flux> fw5a : [[], [24, 18], [66], [], [34, 44, 40], [], []]
<Flux> fw5b : [[24, 18], [66], [34, 44, 40]]

scan()

其对流中的每个元素依次应用累加器函数,并发出所有中间结果,从而生成一个新的流。其与reduce操作符非常相似,但区别在于后者只会向下游发出最终的累积结果;而前者则会下游发出所有的中间累积结果

scan(BiFunction accumulator)版本:

  • 首先, 直接将流中的第一个元素作为累计值acc的初值; 同时,向下游发出该累计值acc
  • 然后,从流中的第二个元素开始,将其作为cur当前元素。cur、acc作为输入,计算得到新的acc; 同时,向下游发出这个新的acc累计值
  • 重复上述过程,直到流结束
1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws Exception {
ReactorLogUtil.separator("scan()");
Flux<Integer> fs1 = Flux.just(3,2,7,8,4)
// 求和
.scan( (acc, cur) -> {
int res = acc + cur;
System.out.println("acc:"+acc+" cur:"+cur+" res:"+res);
return res;
});
ReactorLogUtil.log("fs1", fs1);
}

输出如下

1
2
3
4
5
6
-------------------- scan() --------------------
acc:3 cur:2 res:5
acc:5 cur:7 res:12
acc:12 cur:8 res:20
acc:20 cur:4 res:24
<Flux> fs1 : [3, 5, 12, 20, 24]

scan(A initial, BiFunction accumulator)版本:

  • 首先, 将initial作为acc累计值的初值;同时,向下游发出该累计值acc
  • 然后,从流中的第一个元素开始,将其作为cur当前元素。cur、acc作为输入,计算得到新的acc; 同时,向下游发出这个新的acc累计值
  • 重复上述过程,直到流结束
1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) throws Exception {
ReactorLogUtil.separator("scan()");
Flux<Integer> fs2 = Flux.just("Jack", "Aaron", "tom", "gg")
.scan( 800, (acc, cur) -> {
int res = acc + cur.length();
System.out.println("acc:"+acc+" cur:"+cur+" res:"+res);
return res;
});
ReactorLogUtil.log("fs2", fs2);
}

输出如下

1
2
3
4
5
6
-------------------- scan() --------------------
acc:800 cur:Jack res:804
acc:804 cur:Aaron res:809
acc:809 cur:tom res:812
acc:812 cur:gg res:814
<Flux> fs2 : [800, 804, 809, 812, 814]
请我喝杯咖啡捏~

欢迎关注我的微信公众号:青灯抽丝