这里介绍Spring WebFlux中的组合/延迟操作符
打印工具 为了方便可视化,辅助打印工具如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 import java.time.Duration;import java.util.List;import java.util.Optional;import reactor.core.publisher.Flux;import reactor.core.publisher.Mono;public class ReactorLogUtil { private static Duration timeout = Duration.ofSeconds(60 ); private static final String separator = "-" .repeat(20 ); public static <T> void log (String varName, Mono<T> mono) { Object res = null ; try { res = mono.block(timeout); }catch (Exception ex) { String msg = Optional.ofNullable(ex.getMessage()) .map(String::toLowerCase) .orElse("" ); if ( msg.contains("timeout" ) ) { res = "<Timeout>" ; } else { res = "<Throw " +ex.toString()+">" ; } } String msg = new StringBuilder () .append("<Mono> " ) .append(varName) .append(" : " ) .append( Optional.ofNullable(res).orElse("<Empty Mono>" ) ) .toString(); System.out.println(msg); } public static <T> void log (String varName, Flux<T> flux) { String res = null ; try { Mono<List<T>> listMono = flux.collectList(); List<T> list = listMono.block(timeout); res = list.toString(); }catch (Exception ex) { if ( ex.getMessage().toLowerCase().contains("timeout" ) ) { res = "<Timeout>" ; } else { res = "<Throw " +ex.toString()+">" ; } } String msg = new StringBuilder () .append("<Flux> " ) .append(varName) .append(" : " ) .append( res ) .toString(); System.out.println(msg); } public static void separator (String title) { System.out.println("\n" +separator+" " +title+" " +separator); } }
组合操作符 merge()/mergeWith() 将多个Mono/Flux中的元素合并到一个Flux。元素不保证顺序, 先到达的排在前面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("merge()/mergeWith()" ); Mono<String> m1 = Mono.just("Tony" ) .delayElement(Duration.ofMillis(400 )); Mono<String> m2 = Mono.just("69" ) .delayElement(Duration.ofMillis(100 )); Flux<String> f1 = Flux.just("A" ,"B" ,"C" ) .delayElements(Duration.ofMillis(120 )); Flux<String> f2 = Flux.just("1" , "2" , "3" ) .delayElements(Duration.ofMillis(100 )); Flux<String> fm12 = m1.mergeWith(m2); ReactorLogUtil.log("fm12" , fm12); Flux<?> fm1 = Flux.merge(f1, f2); Flux<?> fm2 = f1.mergeWith(f2); ReactorLogUtil.log("f12" , fm1); ReactorLogUtil.log("f12" , fm2); }
输出如下
1 2 3 4 -------------------- merge()/mergeWith() -------------------- <Flux> fm12 : [69 , Tony] <Flux> f12 : [1 , A, 2 , B, 3 , C] <Flux> f12 : [1 , A, 2 , B, 3 , C]
concat()/concatWith() 将多个Mono/Flux中的元素合并到一个Flux中。元素顺序为指定的Mono/Flux拼接顺序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("concat()/concatWith()" ); Mono<String> m1 = Mono.just("Tony" ) .delayElement(Duration.ofMillis(400 )); Mono<String> m2 = Mono.just("69" ) .delayElement(Duration.ofMillis(100 )); Flux<String> f1 = Flux.just("A" ,"B" ,"C" ) .delayElements(Duration.ofMillis(120 )); Flux<String> f2 = Flux.just("1" , "2" , "3" ) .delayElements(Duration.ofMillis(100 )); Flux<String> fc12 = m1.concatWith(m2); ReactorLogUtil.log("fc12" , fc12); Flux<String> fc1 = Flux.concat(f1, f2); Flux<String> fc2 = f1.concatWith(f2); ReactorLogUtil.log("fc1" , fc1); ReactorLogUtil.log("fc2" , fc2); }
输出如下
1 2 3 4 -------------------- concat()/concatWith() -------------------- <Flux> fc12 : [Tony, 69 ] <Flux> fc1 : [A, B, C, 1 , 2 , 3 ] <Flux> fc2 : [A, B, C, 1 , 2 , 3 ]
zip()/zipWith() 将各Flux/Mono中对应索引位置的元素组合在一起。支持自定义组合逻辑, 默认组合为Tuple元组
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("zip()/zipWith()" ); Mono<String> m1 = Mono.just("Tony" ) .delayElement(Duration.ofMillis(400 )); Mono<String> m2 = Mono.just("69" ) .delayElement(Duration.ofMillis(100 )); Flux<String> f1 = Flux.just("A" ,"B" ,"C" ) .delayElements(Duration.ofMillis(120 )); Flux<String> f2 = Flux.just("1" , "2" , "3" ) .delayElements(Duration.ofMillis(100 )); Mono<Tuple2<String, String>> mz1 = Mono.zip(m1, m2); Mono<String> mz2 = Mono.zip(m1, m2, (name, age)->name.toUpperCase()+" is " +age); ReactorLogUtil.log("mz1" , mz1); ReactorLogUtil.log("mz2" , mz2); Flux<Tuple2<String, String>> fz1 = Flux.zip(f1, f2); Flux<String> fz2 = f1.zipWith(f2, (f1e, f2e) -> f1e.toLowerCase() + f2e); ReactorLogUtil.log("fz1" , fz1); ReactorLogUtil.log("fz2" , fz2); }
输出如下
1 2 3 4 5 -------------------- zip()/zipWith() -------------------- <Mono> mz1 : [Tony,69 ] <Mono> mz2 : TONY is 69 <Flux> fz1 : [[A,1 ], [B,2 ], [C,3 ]] <Flux> fz2 : [a1, b2, c3]
defaultIfEmpty() 上游流为空时,返回默认值
1 2 3 4 5 6 7 8 9 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("defaultIfEmpty()" ); Flux<String> fd1 = Flux.just("Aaron" , "Bob" ) .defaultIfEmpty("Tina" ); Flux<?> fd2 = Flux.empty() .defaultIfEmpty("Tina" ); ReactorLogUtil.log("fd1" , fd1); ReactorLogUtil.log("fd2" , fd2); }
输出如下
1 2 3 -------------------- defaultIfEmpty() -------------------- <Flux> fd1 : [Aaron, Bob] <Flux> fd2 : [Tina]
switchIfEmpty() 上游流为空时,切换到备用流
1 2 3 4 5 6 7 8 9 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("switchIfEmpty()" ); Flux<String> fs1 = Flux.just("Aaron" , "Bob" ) .switchIfEmpty( Flux.just("11" , "22" ) ); Flux<?> fs2 = Flux.empty() .switchIfEmpty( Flux.just("11" , "22" ) ); ReactorLogUtil.log("fs1" , fs1); ReactorLogUtil.log("fs2" , fs2); }
输出如下
1 2 3 -------------------- switchIfEmpty() -------------------- <Flux> fs1 : [Aaron, Bob] <Flux> fs2 : [11 , 22 ]
switchOnFirst() 根据第一个元素决定,是使用原始流还是备用流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("switchOnFirst()" ); Flux<String> ff1 = Flux.just("Start" , "Data1" , "Data2" , "End" ) .switchOnFirst((signal, flux) -> { if ( signal.hasValue() && "Start" .equals(signal.get()) ){ return flux.map(String::toUpperCase); } else { return Flux.just("Error1" , "Error 2" ); } }); Flux<String> ff2 = Flux.just("Data1" , "Data2" , "End" ) .switchOnFirst((signal, flux) -> { if ( signal.hasValue() && "Start" .equals(signal.get()) ){ return flux.map(String::toUpperCase); } else { return Flux.just("Error1" , "Error 2" ); } }); ReactorLogUtil.log("ff1" , ff1); ReactorLogUtil.log("ff2" , ff2); }
输出如下
1 2 3 -------------------- switchOnFirst() -------------------- <Flux> ff1 : [START, DATA1, DATA2, END] <Flux> ff2 : [Error1, Error 2 ]
startWith() 在流的最前面添加指定元素
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("startWith()" ); Flux<String> fst1 = Flux.just("11" , "33" , "55" ) .startWith("Hello" , "World" ); Flux<String> fst2 = Flux.just("11" , "33" , "55" ) .startWith( List.of("-1" , "-2" ) ); Flux<String> fst3 = Flux.just("11" , "33" , "55" ) .startWith( Flux.just("Aaron" ,"Bob" ) ); ReactorLogUtil.log("fst1" , fst1); ReactorLogUtil.log("fst2" , fst2); ReactorLogUtil.log("fst3" , fst3); }
输出如下
1 2 3 4 -------------------- startWith() -------------------- <Flux> fst1 : [Hello, World, 11 , 33 , 55 ] <Flux> fst2 : [-1 , -2 , 11 , 33 , 55 ] <Flux> fst3 : [Aaron, Bob, 11 , 33 , 55 ]
then() 当收到上游的onComplete信号后,其会返回一个 空Mono 或 指定Mono; 如果上游出现了onError信号,其则会将该错误信号继续向下游传递。适用于不关心上游返回的数据,只关心是否正常完成的场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("then()" ); Mono<Void> ft1 = Flux.just("Bob" , "Tina" ) .map(String::toUpperCase) .doOnNext( str-> System.out.println("ft1 : " + str) ) .then(); ReactorLogUtil.log("ft1" , ft1); Mono<Void> ft2 = Flux.just("23" ,"Aaron" ) .map( Integer::parseInt ) .doOnNext( num-> System.out.println("ft2 : " + num) ) .then(); ReactorLogUtil.log("ft2" , ft2); Mono<String> ft3 = Flux.just("11" , "33" ) .map( Integer::parseInt ) .doOnNext( num-> System.out.println("ft3 : " + num) ) .then(Mono.just("Success" )); ReactorLogUtil.log("ft3" , ft3); Mono<String> ft4 = Flux.just("23" ,"Aaron" ) .map( Integer::parseInt ) .doOnNext( num-> System.out.println("ft4 : " + num) ) .then(Mono.just("Success" )); ReactorLogUtil.log("ft4" , ft4); }
输出如下
1 2 3 4 5 6 7 8 9 10 11 -------------------- then() -------------------- ft1 : BOB ft1 : TINA <Mono> ft1 : <Empty Mono> ft2 : 23 <Mono> ft2 : <Throw java.lang.NumberFormatException: For input string: "Aaron" > ft3 : 11 ft3 : 33 <Mono> ft3 : Success ft4 : 23 <Mono> ft4 : <Throw java.lang.NumberFormatException: For input string: "Aaron" >
thenEmpty() 当收到上游的onComplete信号后,其会执行另一个异步动作。且该异步动作最终返回的也是空Mono; 如果上游出现了onError信号,其则会将该错误信号继续向下游传递。适用于当上游正常结束后,需要执行其他任务的场景。例如:记录日志、发送通知等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("thenEmpty()" ); Mono<Void> fte1 = Flux.just("22" , "44" ) .map(Integer::parseInt) .doOnNext( num-> System.out.println("fte1 #1 : " + num) ) .thenEmpty( Mono.just("Clean Resource" ) .doOnNext(str -> System.out.println("fte1 #2 : " + str)) .then() ); ReactorLogUtil.log("fte1" , fte1); Mono<Void> fte2 = Flux.just("33" , "Aaron" ) .map(Integer::parseInt) .doOnNext( num-> System.out.println("fte2 #1 : " + num) ) .thenEmpty( Mono.just("Clean Resource" ) .doOnNext(str -> System.out.println("fte2 #2 : " + str)) .then() ); ReactorLogUtil.log("fte2" , fte2); }
输出如下
1 2 3 4 5 6 7 -------------------- thenEmpty() -------------------- fte1 #1 : 22 fte1 #1 : 44 fte1 #2 : Clean Resource <Mono> fte1 : <Empty Mono> fte2 #1 : 33 <Mono> fte2 : <Throw java.lang.NumberFormatException: For input string: "Aaron" >
thenMany() 当收到上游的onComplete信号后,其会执行另一个异步动作。且该异步动作最终返回的是一个Flux; 如果上游出现了onError信号,其则会将该错误信号继续向下游传递。适用于 当上游前置任务成功完成后,才可以进行下一步的业务动作。例如:前置的系统初始化、身份验证等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("thenMany()" ); Flux<Integer> ftm1 = Mono.just("Aaron" ) .map(String::toUpperCase) .doOnSuccess(name -> System.out.println(name + ": System Init Done" )) .thenMany(Flux.just("17" , "25" , "32" ) .map(Integer::parseInt) ); ReactorLogUtil.log("ftm1" , ftm1); Flux<Integer> ftm2 = Mono.just("Aaron" ) .map( Integer::parseInt ) .doOnSuccess(name -> System.out.println(name + ": System Init Done" )) .thenMany(Flux.just("69" ) .map(Integer::parseInt) ); ReactorLogUtil.log("ftm2" , ftm2); }
输出如下
1 2 3 4 -------------------- thenMany() -------------------- AARON: System Init Done <Flux> ftm1 : [17 , 25 , 32 ] <Flux> ftm2 : <Throw java.lang.NumberFormatException: For input string: "Aaron" >
延迟操作符 delaySubscription() 上游所有操作符(含自身)按 指定时间/另一个发布者的信号 来延迟订阅
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("delaySubscription()" ); Flux.just("Tina" ) .log("#1" ) .delaySubscription( Duration.ofSeconds(4 ) ) .log("#2" ) .subscribe(); Thread.sleep(1000 *8 ); System.out.println("\n-----------------------------------\n" ); Mono.just("69" ) .log("#A" ) .delaySubscription( Mono.just("S" ) .delayElement(Duration.ofSeconds(5 )) .log(" sub publisher" ) ).log("#B" ) .subscribe(); Thread.sleep(1000 *8 ); }
输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 -------------------- delaySubscription() -------------------- 16 :08:14.308 [main] INFO #2 -- onSubscribe(FluxDelaySubscription.DelaySubscriptionOtherSubscriber)16 :08:14.314 [main] INFO #2 -- request(unbounded)16 :08:18.339 [parallel-1 ] INFO #1 -- | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)16 :08:18.340 [parallel-1 ] INFO #1 -- | request(unbounded)16 :08:18.341 [parallel-1 ] INFO #1 -- | onNext(Tina)16 :08:18.341 [parallel-1 ] INFO #2 -- onNext(Tina)16 :08:18.342 [parallel-1 ] INFO #1 -- | onComplete()16 :08:18.342 [parallel-1 ] INFO #2 -- onComplete()----------------------------------- 16 :08:22.331 [main] INFO sub publisher -- onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)16 :08:22.331 [main] INFO #B -- onSubscribe(FluxDelaySubscription.DelaySubscriptionOtherSubscriber)16 :08:22.331 [main] INFO #B -- request(unbounded)16 :08:22.331 [main] INFO sub publisher -- request(unbounded)16 :08:27.337 [parallel-2 ] INFO sub publisher -- onNext(S)16 :08:27.341 [parallel-2 ] INFO sub publisher -- cancel()16 :08:27.342 [parallel-2 ] INFO #A -- | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)16 :08:27.342 [parallel-2 ] INFO #A -- | request(unbounded)16 :08:27.342 [parallel-2 ] INFO #A -- | onNext(69 )16 :08:27.342 [parallel-2 ] INFO #B -- onNext(69 )16 :08:27.342 [parallel-2 ] INFO #A -- | onComplete()16 :08:27.343 [parallel-2 ] INFO #B -- onComplete()16 :08:27.343 [parallel-2 ] INFO sub publisher -- onComplete()
delayElement()/delayElements() 延迟Mono/Flux中每个元素(即onNext信号)的发出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("delayElement()/delayElements()" ); Mono.just("Tony" ) .log("delayElement #1" ) .map( String::toUpperCase ) .log("delayElement #2" ) .delayElement( Duration.ofSeconds(3 ) ) .log("delayElement #3" ) .subscribe(); Thread.sleep(1000 *5 ); System.out.println("\n-----------------------------------\n" ); Flux.just(11 ,22 ,33 ) .log("delayElements #A" ) .delayElements(Duration.ofSeconds(2 )) .log("delayElements #B" ) .subscribe(); Thread.sleep(1000 *8 ); }
输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 -------------------- delayElement()/delayElements() -------------------- 16 :10 :57.582 [main] INFO delayElement #1 -- | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)16 :10 :57.586 [main] INFO delayElement #2 -- | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)16 :10 :57.587 [main] INFO delayElement #3 -- onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)16 :10 :57.587 [main] INFO delayElement #3 -- request(unbounded)16 :10 :57.587 [main] INFO delayElement #2 -- | request(unbounded)16 :10 :57.588 [main] INFO delayElement #1 -- | request(unbounded)16 :10 :57.589 [main] INFO delayElement #1 -- | onNext(Tony)16 :10 :57.589 [main] INFO delayElement #2 -- | onNext(TONY)16 :10 :57.590 [main] INFO delayElement #1 -- | onComplete()16 :10 :57.591 [main] INFO delayElement #2 -- | onComplete()16 :11 :00.597 [parallel-1 ] INFO delayElement #3 -- onNext(TONY)16 :11 :00.599 [parallel-1 ] INFO delayElement #3 -- onComplete()----------------------------------- 16 :11 :02.669 [main] INFO delayElements #A -- | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)16 :11 :02.669 [main] INFO delayElements #B -- onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber)16 :11 :02.669 [main] INFO delayElements #B -- request(unbounded)16 :11 :02.669 [main] INFO delayElements #A -- | request(1 )16 :11 :02.669 [main] INFO delayElements #A -- | onNext(11 )16 :11 :04.678 [parallel-2 ] INFO delayElements #B -- onNext(11 )16 :11 :04.679 [parallel-2 ] INFO delayElements #A -- | request(1 )16 :11 :04.679 [parallel-2 ] INFO delayElements #A -- | onNext(22 )16 :11 :06.684 [parallel-3 ] INFO delayElements #B -- onNext(22 )16 :11 :06.684 [parallel-3 ] INFO delayElements #A -- | request(1 )16 :11 :06.685 [parallel-3 ] INFO delayElements #A -- | onNext(33 )16 :11 :06.685 [parallel-3 ] INFO delayElements #A -- | onComplete()16 :11 :08.690 [parallel-4 ] INFO delayElements #B -- onNext(33 )16 :11 :08.691 [parallel-4 ] INFO delayElements #B -- onComplete()
delaySequence() 仅延迟第一个元素的发出时间,后续元素的发出按元素原有的时间间隔。相当于, 将响应式流的所有元素在时间轴上向右平移指定的时长, 而元素之间原有的时间间隔保持不变
1 2 3 4 5 6 7 8 9 10 11 12 13 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("delaySequence()" ); Flux.just(1 ,2 ,3 ) .delayElements(Duration.ofSeconds(4 )) .log("#1" ) .delaySequence( Duration.ofSeconds(10 ) ) .log("#2" ) .subscribe(); Thread.sleep(1000 *30 ); }
输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13 -------------------- delaySequence() -------------------- 16 :17 :51.358 [main] INFO #1 -- onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber)16 :17 :51.362 [main] INFO #2 -- onSubscribe(SerializedSubscriber)16 :17 :51.363 [main] INFO #2 -- request(unbounded)16 :17 :51.363 [main] INFO #1 -- request(unbounded)16 :17 :55.377 [parallel-1 ] INFO #1 -- onNext(1 )16 :17 :59.389 [parallel-3 ] INFO #1 -- onNext(2 )16 :18 :03.396 [parallel-4 ] INFO #1 -- onNext(3 )16 :18 :03.400 [parallel-4 ] INFO #1 -- onComplete()16 :18 :05.389 [parallel-2 ] INFO #2 -- onNext(1 )16 :18 :09.393 [parallel-2 ] INFO #2 -- onNext(2 )16 :18 :13.401 [parallel-2 ] INFO #2 -- onNext(3 )16 :18 :13.402 [parallel-2 ] INFO #2 -- onComplete()
delayUntil() 每个元素发出前,延迟等待一个异步操作完成
1 2 3 4 5 6 7 8 9 10 11 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("delayUntil()" ); Flux.just(1 ,2 ,3 ) .log("#1" ) .delayUntil( num -> Mono.just("gg" ).delayElement(Duration.ofSeconds(num))) .log("#2" ) .subscribe(); Thread.sleep(1000 *10 ); }
输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 -------------------- delayUntil() -------------------- 16 :21 :07.597 [main] INFO #1 -- | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)16 :21 :07.599 [main] INFO #2 -- onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber)16 :21 :07.600 [main] INFO #2 -- request(unbounded)16 :21 :07.600 [main] INFO #1 -- | request(1 )16 :21 :07.600 [main] INFO #1 -- | onNext(1 )16 :21 :08.654 [parallel-1 ] INFO #2 -- onNext(1 )16 :21 :08.655 [parallel-1 ] INFO #1 -- | request(1 )16 :21 :08.656 [parallel-1 ] INFO #1 -- | onNext(2 )16 :21 :10.661 [parallel-2 ] INFO #2 -- onNext(2 )16 :21 :10.661 [parallel-2 ] INFO #1 -- | request(1 )16 :21 :10.661 [parallel-2 ] INFO #1 -- | onNext(3 )16 :21 :10.662 [parallel-2 ] INFO #1 -- | onComplete()16 :21 :13.667 [parallel-3 ] INFO #2 -- onNext(3 )16 :21 :13.667 [parallel-3 ] INFO #2 -- onComplete()