这里介绍Spring WebFlux中的转换操作符

打印工具
为了方便可视化,辅助打印工具如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| import java.time.Duration; import java.util.List; import java.util.Optional; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono;
public class ReactorLogUtil { private static Duration timeout = Duration.ofSeconds(60);
private static final String separator = "-".repeat(20);
public static <T> void log(String varName, Mono<T> mono) { Object res = null; try { res = mono.block(timeout); }catch (Exception ex) { String msg = Optional.ofNullable(ex.getMessage()) .map(String::toLowerCase) .orElse(""); if( msg.contains("timeout") ) { res = "<Timeout>"; } else { res = "<Throw "+ex.toString()+">"; } }
String msg = new StringBuilder() .append("<Mono> ") .append(varName) .append(" : ") .append( Optional.ofNullable(res).orElse("<Empty Mono>") ) .toString(); System.out.println(msg); }
public static <T> void log(String varName, Flux<T> flux) { String res = null; try{ Mono<List<T>> listMono = flux.collectList(); List<T> list = listMono.block(timeout); res = list.toString(); }catch (Exception ex) { if( ex.getMessage().toLowerCase().contains("timeout") ) { res = "<Timeout>"; } else { res = "<Throw "+ex.toString()+">"; } } String msg = new StringBuilder() .append("<Flux> ") .append(varName) .append(" : ") .append( res ) .toString(); System.out.println(msg); }
public static void separator(String title) { System.out.println("\n"+separator+" "+title+" "+separator); } }
|
map()
将每个元素T通过一个函数串行的、同步的转换为一个新元素R。故该函数中不可以存在被阻塞的情形,例如:网络请求、数据库查询等
1 2 3 4 5 6 7 8 9 10 11
| public static void main(String[] args) throws Exception{ ReactorLogUtil.separator("map()");
Mono<Integer> m1 = Mono.just("69") .map(str -> Integer.valueOf(str)); ReactorLogUtil.log("m1", m1);
Flux<String> f1 = Flux.just("Tina", "David", "Bob") .map(str->str.length()<=3 ? str.toLowerCase() : str.toUpperCase()); ReactorLogUtil.log("f1", f1); }
|
输出如下
1 2 3
| -------------------- map() -------------------- <Mono> m1 : 69 <Flux> f1 : [TINA, DAVID, bob]
|
flatMap()
将各元素T通过一个函数并行的映射为新的响应式流 Mono / Flux。这个过程是异步的,当响应式流返回结果时,flatMap内部会自动进行压平,然后将结果元素R推到下游。特别地:对Flux使用flatMap,如果响应式流返回的是一个空Mono/Flux,其会直接忽略。显然,由于是并行的,故无法保证 输出结果的顺序 与 输入元素的顺序 是一致
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public static void main(String[] args) throws Exception{ ReactorLogUtil.separator("flatMap()");
Mono<String> m1 = Mono.just("David") .flatMap(name -> getAge(name)); ReactorLogUtil.log("m1", m1);
Flux<String> f1 = Flux.just("Tina", "Aaron", "Bob", "Amy") .flatMap(name -> getAge(name)); ReactorLogUtil.log("f1", f1); }
private static Mono<String> getAge(String name){ System.out.println(LocalTime.now() + " getAge param: " + name);
if( "Amy".equals(name) ) { return Mono.empty(); }
Integer age = null; Long delayTime = null; switch (name) { case "Aaron" -> { age=3; delayTime = 3L; } case "Bob" -> { age = 1; delayTime = 1L; } case "Tina" -> { age = 5; delayTime = 5L; } default -> { age = 0; delayTime = 0L; } } return Mono.just(name + ">>>" + age) .delayElement( Duration.ofSeconds(delayTime) ); }
|
输出如下
1 2 3 4 5 6 7 8
| -------------------- flatMap() -------------------- 16:47:41.624932 getAge param: David <Mono> m1 : David>>>0 16:47:41.653836 getAge param: Tina 16:47:41.654409 getAge param: Aaron 16:47:41.654605 getAge param: Bob 16:47:41.654708 getAge param: Amy <Flux> f1 : [Bob>>>1, Aaron>>>3, Tina>>>5]
|
concatMap()
flatMap()的串行版本。可保证输出结果的顺序与输入元素的顺序一致。即拿到第一个元素T1的结果R1;再发起第二个元素T2的转换,直到拿到结果R2后,才发起第三个元素T3的转换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public static void main(String[] args) throws Exception{ ReactorLogUtil.separator("concatMap()"); Flux<String> f1 = Flux.just("Tina", "Amy", "Aaron", "Bob") .concatMap(name -> getAge(name)); ReactorLogUtil.log("f1", f1); }
private static Mono<String> getAge(String name){ System.out.println(LocalTime.now() + " getAge param: " + name);
if( "Amy".equals(name) ) { return Mono.empty(); }
Integer age = null; Long delayTime = null; switch (name) { case "Aaron" -> { age=3; delayTime = 3L; } case "Bob" -> { age = 1; delayTime = 1L; } case "Tina" -> { age = 5; delayTime = 5L; } default -> { age = 0; delayTime = 0L; } } return Mono.just(name + ">>>" + age) .delayElement( Duration.ofSeconds(delayTime) ); }
|
输出如下
1 2 3 4 5 6
| -------------------- concatMap() -------------------- 16:44:11.823970 getAge param: Tina 16:44:16.862611 getAge param: Amy 16:44:16.863213 getAge param: Aaron 16:44:19.868758 getAge param: Bob <Flux> f1 : [Tina>>>5, Aaron>>>3, Bob>>>1]
|
flatMapSequential()
既具备flatMap()的优点:并发执行;又具备concatMap()的优点:按序输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public static void main(String[] args) throws Exception{ ReactorLogUtil.separator("flatMapSequential()"); Flux<String> f1 = Flux.just("Tina","Amy", "Aaron", "Bob") .flatMapSequential(name -> getAge(name)); ReactorLogUtil.log("f1", f1); }
private static Mono<String> getAge(String name){ System.out.println(LocalTime.now() + " getAge param: " + name);
if( "Amy".equals(name) ) { return Mono.empty(); }
Integer age = null; Long delayTime = null; switch (name) { case "Aaron" -> { age=3; delayTime = 3L; } case "Bob" -> { age = 1; delayTime = 1L; } case "Tina" -> { age = 5; delayTime = 5L; } default -> { age = 0; delayTime = 0L; } } return Mono.just(name + ">>>" + age) .delayElement( Duration.ofSeconds(delayTime) ); }
|
输出如下
1 2 3 4 5 6
| -------------------- flatMapSequential() -------------------- 16:46:26.446800 getAge param: Tina 16:46:26.465326 getAge param: Amy 16:46:26.465597 getAge param: Aaron 16:46:26.465878 getAge param: Bob <Flux> f1 : [Tina>>>5, Aaron>>>3, Bob>>>1]
|
cast()
将每个元素强制转换为指定类型,无法转换会发生ClassCastException错误
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static void main(String[] args) throws Exception{ ReactorLogUtil.separator("cast()");
Mono<Integer> mc1 = Mono.just(18) .cast(Integer.class); Mono<Double> mc2 = Mono.just(18) .cast(Double.class); ReactorLogUtil.log("mc1", mc1); ReactorLogUtil.log("mc2", mc2);
Flux<Integer> fc1 = Flux.just(2,3) .cast(Integer.class); Flux<Long> fc2 = Flux.just(4,"car") .cast(Long.class); ReactorLogUtil.log("fc1", fc1); ReactorLogUtil.log("fc2", fc2); }
|
输出如下
1 2 3 4 5
| -------------------- cast() -------------------- <Mono> mc1 : 18 <Mono> mc2 : <Throw java.lang.ClassCastException: Cannot cast java.lang.Integer to java.lang.Double> <Flux> fc1 : [2, 3] <Flux> fc2 : <Throw java.lang.ClassCastException: Cannot cast java.lang.Integer to java.lang.Long>
|
handle()
其既具备同步转换元素的能力,也可以过滤筛选元素。相当于map+filter的组合
例如,我们期望将字符串转换为整数,转换失败的请忽略。第1种实现方式,使用map进行转换, 转换失败时需返回null。由于onNext 信号传递的元素不能为null,故其会发出NPE的onError信号。换言之,通过map操作符无法实现 1到0(1->0) 的映射
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static void main(String[] args) throws Exception{ ReactorLogUtil.separator("handle()"); Flux<Integer> f5a = Flux.just("18", "Aaron", "69") .map(str -> { Integer num = null; try { num = Integer.parseInt(str); } catch (Exception ex) { System.out.println("Str 2 Integer fail: "+ str); } return num; }); ReactorLogUtil.log("f5a", f5a); }
|
输出如下
1 2 3
| -------------------- handle() -------------------- Str 2 Integer fail: Aaron <Flux> f5a : <Throw java.lang.NullPointerException: The mapper [com.aaron.reactive.service.operator.OperatorDemo$$Lambda$25/0x0000000136082798] returned a null value.>
|
第2种实现方式:使用flatMap进行转换。转换失败时, 通过返回空Mono 来实现 1到0(1->0) 的映射
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static void main(String[] args) throws Exception{ ReactorLogUtil.separator("handle()"); Flux<Integer> f5b = Flux.just("18", "Aaron", "69") .flatMap(str -> { Integer num = null; try { num = Integer.parseInt(str); } catch (Exception ex) { System.out.println("Str 2 Integer fail: " + str); } return Mono.justOrEmpty(num); }); ReactorLogUtil.log("f5b", f5b); }
|
输出如下
1 2 3
| -------------------- handle() -------------------- Str 2 Integer fail: Aaron <Flux> f5b : [18, 69]
|
第3种方式通过handle来实现。显然,使用它来实现上述需求 比上面的基于flatMap的第2种实现方式更合适,因为后者是一个较重的异步操作。其中,SynchronousSink 的常用方法:
- next(): 发出一个元素到下游。反之,如果需要过滤掉该元素,什么都不做就可以了。值得注意的是,对于每一个上游流过来的元素, sink.next()最多只能被执行一次。即其支持的映射是 1 到 0或1 (1 -> 0或1)
- error(): 发出错误以终止流
- complete(): 正常终止流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static void main(String[] args) throws Exception{ ReactorLogUtil.separator("handle()"); Flux<Object> f5c = Flux.just("18", "Aaron", "69") .handle((str, sink) -> { try { Integer num = Integer.parseInt(str); sink.next(num); } catch (Exception ex) { System.out.println("Str 2 Integer fail: "+ str); } }); ReactorLogUtil.log("f5c", f5c); }
|
输出如下
1 2 3
| -------------------- handle() -------------------- Str 2 Integer fail: Aaron <Flux> f5c : [18, 69]
|
可以将共用的处理逻辑(若干个操作符)抽取、封装为一个可复用的单元,然后通过该函数对响应流Mono/Flux应用该处理逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public static void main(String[] args) throws Exception { ReactorLogUtil.separator("transform()"); Function<Mono<String>, Publisher<Integer>> str2IntConvert = mono -> { return mono.map( Integer::parseInt ) .map( Math::abs ) .doOnSuccess( num -> System.out.println("convert success res: "+num)) .doOnError( ex-> System.out.println("convert error: "+ex.getMessage())) .onErrorComplete(); }; Mono<Integer> mt1 = Mono.just("-69") .transform(str2IntConvert); Mono<Integer> mt2 = Mono.just("Tina") .transform(str2IntConvert); ReactorLogUtil.log("mt1", mt1); ReactorLogUtil.log("mt2", mt2);
Function<Flux<String>, Flux<String>> strConvert = flux -> { return flux.filter( str -> str.length()>3 ) .map( String::toUpperCase ); }; Flux<String> ft1 = Flux.just("Bob", "Aaron", "Tim", "Jack", "Tony") .transform(strConvert); ReactorLogUtil.log("ft1", ft1); }
|
输出如下
1 2 3 4 5 6
| -------------------- transform() -------------------- convert success res: 69 <Mono> mt1 : 69 convert error: For input string: "Tina" <Mono> mt2 : <Empty Mono> <Flux> ft1 : [AARON, JACK, TONY]
|
transform操作符接收的Function在声明、装配就会被调用。故该function只会被调用一次。这样,后续所有订阅者使用的都是同一个操作符链。由于操作符链是共享的,故封装在transform内的逻辑必须是无状态的。否则,多个订阅者之间会出现状态污染。而transformDeferred操作符接收的Function每次只有在被订阅时才会被调用,体现出名字中Deferred的延迟含义。这样,每次订阅时都会为该部分操作链动态生成一个全新的、独立的实例。适用于transformDeferred中封装的逻辑是有状态的。典型的场景有:每个订阅者需要记录各自的重试次数。需要注意的是:只有transformDeferred接收的Function中所定义的那部分操作符链才是为每个订阅者单独创建的。而transformDeferred之外的的其他操作符,依然是所有订阅者共享的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public static void main(String[] args) throws Exception { ReactorLogUtil.separator("transformDeferred()"); Function<Flux<String>, Flux<String>> func = flux -> { System.out.println("Func Called: 实例化retryCount"); AtomicInteger retryCount = new AtomicInteger(1); return flux.map(str -> { int count = retryCount.getAndIncrement(); return "Count->" + count + " Item->" + str; }); };
Flux<String> fta = Flux.just("Bob", "Aaron", "Tim") .transform( func ); System.out.println("Test transform"); ReactorLogUtil.log("第1次订阅: fta", fta); ReactorLogUtil.log("第2次订阅: fta", fta);
Flux<String> ftb = Flux.just("Bob", "Aaron", "Tim") .transformDeferred( func ); System.out.println("\nTest transformDeferred"); ReactorLogUtil.log("第1次订阅: ftb", ftb); ReactorLogUtil.log("第2次订阅: ftb", ftb); }
|
输出如下
1 2 3 4 5 6 7 8 9 10 11
| -------------------- transformDeferred() -------------------- Func Called: 实例化retryCount Test transform <Flux> 第1次订阅: fta : [Count->1 Item->Bob, Count->2 Item->Aaron, Count->3 Item->Tim] <Flux> 第2次订阅: fta : [Count->4 Item->Bob, Count->5 Item->Aaron, Count->6 Item->Tim]
Test transformDeferred Func Called: 实例化retryCount <Flux> 第1次订阅: ftb : [Count->1 Item->Bob, Count->2 Item->Aaron, Count->3 Item->Tim] Func Called: 实例化retryCount <Flux> 第2次订阅: ftb : [Count->1 Item->Bob, Count->2 Item->Aaron, Count->3 Item->Tim]
|
groupBy()
将Flux中的元素根据key分发到不同的GroupedFlux分组流中。可通过key()方法获取分组键,常用collectList()方法收集分组结果
1 2 3 4 5 6 7 8 9 10 11 12 13
| public static void main(String[] args) throws Exception { ReactorLogUtil.separator("groupBy()"); Flux<Entry<String, List<Integer>>> fg1 = Flux.just(3, 18, 21, 77, 33, 69, 24, 35) .groupBy(num -> num % 2 == 0 ? "Even" : "Odd") .flatMap( groupedFlux -> { String key = groupedFlux.key(); Mono<List<Integer>> listMono = groupedFlux.collectList(); Mono<Entry<String, List<Integer>>> entryMono = listMono.map(nums -> Map.entry(key, nums)); return entryMono; } ); ReactorLogUtil.log("fg1", fg1); }
|
输出如下
1 2
| -------------------- groupBy() -------------------- <Flux> fg1 : [Even=[18, 24], Odd=[3, 21, 77, 33, 69, 35]]
|
window()
将Flux中的元素按一定规则(元素数量/时间间隔等)收集到不同的窗口中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public static void main(String[] args) throws Exception { ReactorLogUtil.separator("window()"); Flux<List<Integer>> fw1 = Flux.just(27, 32, 12, 99, 45, 31, 12, 75, 87, 56) .window(3) .flatMap(numFlux -> numFlux.collectList()); ReactorLogUtil.log("fw1", fw1);
Flux<List<Integer>> fw2 = Flux.just(27, 32, 12, 99, 45, 31, 12) .delayElements(Duration.ofMillis(30)) .window( Duration.ofMillis(50) ) .flatMap( numFlux -> numFlux.collectList() ); ReactorLogUtil.log("fw2", fw2); }
|
输出如下
1 2 3
| -------------------- window() -------------------- <Flux> fw1 : [[27, 32, 12], [99, 45, 31], [12, 75, 87], [56]] <Flux> fw2 : [[27], [32], [12, 99], [45], [31, 12]]
|
window(int maxSize, int skip) 版本:maxSize参数表示每个窗口的最大元素数量; skip表示创建新窗口起始点的步长。即: 从当前窗口的起始点到下一个窗口的起始点需要前进多少个元素
具体地:
- skip = maxSize: 每次创建新窗口时,上一个窗口都因为元素正好满了而关闭了。这样窗口一个接一个,不重叠、不遗漏。等同于window(maxSize)。即:固定窗口/滚动窗口
- skip < maxSize: 步长 小于 窗口大小,导致窗口产生重叠。除第一个窗口外,每个新窗口的开头都会包含了前一个窗口末尾的(maxSize - skip)个元素。即:滑动窗口
- skip > maxSize: 步长 大于 窗口大小,导致窗口之间产生间隙。连续的两个窗口中间会跳过、丢弃(skip-maxSize)个元素。即:跳跃窗口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public static void main(String[] args) throws Exception { ReactorLogUtil.separator("window()");
Flux<List<Integer>> fwa = Flux.range(1,9) .window(5, 5) .flatMap( numFlux -> numFlux.collectList() );
Flux<List<Integer>> fwb = Flux.range(1,9) .window(5,4) .flatMap( numFlux -> numFlux.collectList() );
Flux<List<Integer>> fwc = Flux.range(1,12) .window(3,5) .flatMap( numFlux -> numFlux.collectList() );
ReactorLogUtil.log("fwa", fwa); ReactorLogUtil.log("fwb", fwb); ReactorLogUtil.log("fwc", fwc); }
|
输出如下
1 2 3 4
| -------------------- window() -------------------- <Flux> fwa : [[1, 2, 3, 4, 5], [6, 7, 8, 9]] <Flux> fwb : [[1, 2, 3, 4, 5], [5, 6, 7, 8, 9], [9]] <Flux> fwc : [[1, 2, 3], [6, 7, 8], [11, 12]]
|
windowUntil()/windowWhile()
windowUntil操作符:一直收集元素到一个窗口中,直到某个元素满足指定条件后; 此时,将该元素继续收集到当前窗口,随后立即关闭当前窗口。然后,立即打开一个新的窗口
windowWhile操作符:只要满足条件,就一直把元素收集到当前窗口。当某个元素不满足条件时,当前窗口会立即关闭。同时该元素会被丢失。值得注意的是,对于连续的不满足条件的元素,可能会产生多个空窗口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public static void main(String[] args) throws Exception { ReactorLogUtil.separator("windowUntil()/windowWhile()"); Flux<List<?>> fw4 = Flux.just(45, 42, "END","END", 18, 17, 12, "END", 22, 29, "END") .windowUntil(event -> "END".equals(event)) .flatMap(eventFlux -> eventFlux.collectList()); ReactorLogUtil.log("fw4", fw4);
Flux<List<Integer>> fw5a = Flux.just(1, 24, 18, 21, 66, 3, 5, 34, 44, 40,13,15,17) .windowWhile(num -> num % 2 == 0) .flatMap(Flux::collectList); ReactorLogUtil.log("fw5a", fw5a);
Flux<List<Integer>> fw5b = fw5a.filter(list -> !list.isEmpty()); ReactorLogUtil.log("fw5b", fw5b); }
|
输出如下
1 2 3 4
| -------------------- windowUntil()/windowWhile() -------------------- <Flux> fw4 : [[45, 42, END], [END], [18, 17, 12, END], [22, 29, END]] <Flux> fw5a : [[], [24, 18], [66], [], [34, 44, 40], [], []] <Flux> fw5b : [[24, 18], [66], [34, 44, 40]]
|
scan()
其对流中的每个元素依次应用累加器函数,并发出所有中间结果,从而生成一个新的流。其与reduce操作符非常相似,但区别在于后者只会向下游发出最终的累积结果;而前者则会下游发出所有的中间累积结果
scan(BiFunction accumulator)版本:
- 首先, 直接将流中的第一个元素作为累计值acc的初值; 同时,向下游发出该累计值acc
- 然后,从流中的第二个元素开始,将其作为cur当前元素。cur、acc作为输入,计算得到新的acc; 同时,向下游发出这个新的acc累计值
- 重复上述过程,直到流结束
1 2 3 4 5 6 7 8 9 10 11
| public static void main(String[] args) throws Exception { ReactorLogUtil.separator("scan()"); Flux<Integer> fs1 = Flux.just(3,2,7,8,4) .scan( (acc, cur) -> { int res = acc + cur; System.out.println("acc:"+acc+" cur:"+cur+" res:"+res); return res; }); ReactorLogUtil.log("fs1", fs1); }
|
输出如下
1 2 3 4 5 6
| -------------------- scan() -------------------- acc:3 cur:2 res:5 acc:5 cur:7 res:12 acc:12 cur:8 res:20 acc:20 cur:4 res:24 <Flux> fs1 : [3, 5, 12, 20, 24]
|
scan(A initial, BiFunction accumulator)版本:
- 首先, 将initial作为acc累计值的初值;同时,向下游发出该累计值acc
- 然后,从流中的第一个元素开始,将其作为cur当前元素。cur、acc作为输入,计算得到新的acc; 同时,向下游发出这个新的acc累计值
- 重复上述过程,直到流结束
1 2 3 4 5 6 7 8 9 10
| public static void main(String[] args) throws Exception { ReactorLogUtil.separator("scan()"); Flux<Integer> fs2 = Flux.just("Jack", "Aaron", "tom", "gg") .scan( 800, (acc, cur) -> { int res = acc + cur.length(); System.out.println("acc:"+acc+" cur:"+cur+" res:"+res); return res; }); ReactorLogUtil.log("fs2", fs2); }
|
输出如下
1 2 3 4 5 6
| -------------------- scan() -------------------- acc:800 cur:Jack res:804 acc:804 cur:Aaron res:809 acc:809 cur:tom res:812 acc:812 cur:gg res:814 <Flux> fs2 : [800, 804, 809, 812, 814]
|