这里介绍Spring WebFlux中的创建、终端操作符
Marble Diagram 弹珠图 一个典型的Marble Diagram弹珠图由以下几个部分组成:
时间轴:一个水平向右的箭头 表示 随着时间的流逝,数据流中的事件从左到右依次发生
Marble弹珠:时间轴上方的各种不同颜色的形状(圆点、方块、三角形等)用于表示数据流发出元素的onNext事件。每个形状均代表一个Data Item数据项
垂直线(|):时间轴上的|垂直线 表示 数据流正常完成的onComplete事件
叉号(x):时间轴上的x叉号 表示 数据流发生错误的onError事件
操作符:一个矩形框。通常位于输入时间轴、输出时间轴之间。表示将上方的输入流转换为下方的输出流
输入流:操作符上方的时间轴 表示 操作符接收的原始数据流
输出流:操作符下方的时间轴 表示 操作符处理后产生的新数据流
下图是onErrorComplete操作符的Marble Diagram弹珠图
创建操作符 Mono、Flux是响应式编程最基础的概念。具体地:Mono用于表示含 0个或1个元素 的响应式流;Flux用于表示含 0个或多个元素 的响应式流。且二者都不允许元素为null。为了方便可视化,辅助打印工具如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 import java.time.Duration;import java.util.List;import java.util.Optional;import reactor.core.publisher.Flux;import reactor.core.publisher.Mono;public class ReactorLogUtil { private static Duration timeout = Duration.ofSeconds(60 ); private static final String separator = "-" .repeat(20 ); public static <T> void log (String varName, Mono<T> mono) { Object res = null ; try { res = mono.block(timeout); }catch (Exception ex) { String msg = Optional.ofNullable(ex.getMessage()) .map(String::toLowerCase) .orElse("" ); if ( msg.contains("timeout" ) ) { res = "<Timeout>" ; } else { res = "<Throw " +ex.toString()+">" ; } } String msg = new StringBuilder () .append("<Mono> " ) .append(varName) .append(" : " ) .append( Optional.ofNullable(res).orElse("<Empty Mono>" ) ) .toString(); System.out.println(msg); } public static <T> void log (String varName, Flux<T> flux) { String res = null ; try { Mono<List<T>> listMono = flux.collectList(); List<T> list = listMono.block(timeout); res = list.toString(); }catch (Exception ex) { if ( ex.getMessage().toLowerCase().contains("timeout" ) ) { res = "<Timeout>" ; } else { res = "<Throw " +ex.toString()+">" ; } } String msg = new StringBuilder () .append("<Flux> " ) .append(varName) .append(" : " ) .append( res ) .toString(); System.out.println(msg); } public static void separator (String title) { System.out.println("\n" +separator+" " +title+" " +separator); } }
just() 创建指定元素的Mono/Flux
1 2 3 4 5 6 7 8 9 10 11 public static void main (String[] args) { ReactorLogUtil.separator("just()" ); Mono<String> monoJ1 = Mono.just("Aaron" ); Flux<String> fluxJ1 = Flux.just("Tony" ); Flux<String> fluxJ2 = Flux.just("Cat" , "Dog" , "Human" ); Flux<String> fluxJ3 = Flux.just(); ReactorLogUtil.log("monoJ1" , monoJ1); ReactorLogUtil.log("fluxJ1" , fluxJ1); ReactorLogUtil.log("fluxJ2" , fluxJ2); ReactorLogUtil.log("fluxJ3" , fluxJ3); }
输出如下
1 2 3 4 5 -------------------- just() -------------------- <Mono> monoJ1 : Aaron <Flux> fluxJ1 : [Tony] <Flux> fluxJ2 : [Cat, Dog, Human] <Flux> fluxJ3 : []
defer()/fromSupplier() 当Mono/Flux真正被需要、订阅时,Supplier的get()方法才会被调用,即真正需要时才创建Mono/Flux
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static void main (String[] args) throws InterruptedException { ReactorLogUtil.separator("defer() / fromSupplier()" ); Mono<LocalTime> mt1 = Mono.just(LocalTime.now()); Mono<LocalTime> mt2 = Mono.defer( ()->Mono.just(LocalTime.now()) ); Mono<LocalTime> mt3 = Mono.fromSupplier(() -> LocalTime.now()); Flux<LocalTime> ft1 = Flux.just(LocalTime.now()); Flux<LocalTime> ft2 = Flux.defer( ()->Flux.just(LocalTime.now()) ); Thread.sleep(5 *1000 ); ReactorLogUtil.log("mt1" , mt1); ReactorLogUtil.log("mt2" , mt2); ReactorLogUtil.log("mt3" , mt3); System.out.println("\n" +"=" .repeat(50 )+"\n" ); ReactorLogUtil.log("ft1" , ft1); ReactorLogUtil.log("ft2" , ft2); }
输出如下
1 2 3 4 5 6 7 8 9 -------------------- defer() / fromSupplier() -------------------- <Mono> mt1 : 14 :03 :14.174406 <Mono> mt2 : 14 :03 :19.455076 <Mono> mt3 : 14 :03 :19.464879 ================================================== <Flux> ft1 : [14 :03 :14.358709 ] <Flux> ft2 : [14 :03 :19.466663 ]
empty() 创建空Mono/Flux(0个元素)
1 2 3 4 5 6 7 public static void main (String[] args) throws InterruptedException { ReactorLogUtil.separator("empty()" ); Mono<?> monoE1 = Mono.empty(); Flux<?> fluxE1 = Flux.empty(); ReactorLogUtil.log("monoE1" ,monoE1 ); ReactorLogUtil.log("fluxE1" , fluxE1); }
输出如下
1 2 3 -------------------- empty() -------------------- <Mono> monoE1 : <Empty Mono> <Flux> fluxE1 : []
justOrEmpty() 如果data不为null,则创建包含该元素的Mono;否则,创建空Mono
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("justOrEmpty()" ); Mono<Integer> mje1 = Mono.justOrEmpty(null ); Mono<Integer> mje2 = Mono.justOrEmpty(69 ); Mono<Integer> mje3 = Mono.justOrEmpty( Optional.ofNullable(null ) ); Mono<Integer> mje4 = Mono.justOrEmpty( Optional.of(119 ) ); ReactorLogUtil.log( "mje1" , mje1 ); ReactorLogUtil.log( "mje2" , mje2 ); ReactorLogUtil.log( "mje3" , mje3 ); ReactorLogUtil.log( "mje4" , mje4 ); }
输出如下
1 2 3 4 5 -------------------- justOrEmpty() -------------------- <Mono> mje1 : <Empty Mono> <Mono> mje2 : 69 <Mono> mje3 : <Empty Mono> <Mono> mje4 : 119
error() 创建不包含任何元素,直接发出onError信号的Mono/Flux
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("error()" ); Mono<Object> me1 = Mono.error(new RuntimeException ("服务异常" )); Mono<Object> me2 = Mono.error(() -> new IllegalArgumentException ("缺少主键" )); Flux<Object> fe1 = Flux.error( new RuntimeException ("资源有限" ) ); Flux<Object> fe2 = Flux.error(() -> new IllegalArgumentException ("缺少名称" )); ReactorLogUtil.log("me1" , me1); ReactorLogUtil.log("me2" , me2); ReactorLogUtil.log("fe1" , fe1); ReactorLogUtil.log("fe2" , fe2); }
输出如下
1 2 3 4 5 -------------------- error() -------------------- <Mono> me1 : <Throw java.lang.RuntimeException: 服务异常> <Mono> me2 : <Throw java.lang.IllegalArgumentException: 缺少主键> <Flux> fe1 : <Throw java.lang.RuntimeException: 资源有限> <Flux> fe2 : <Throw java.lang.IllegalArgumentException: 缺少名称>
never() 创建永远不会发出任何信号、元素的Mono/Flux。通常用于测试超时逻辑
1 2 3 4 5 6 7 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("never()" ); Mono<Object> mn1 = Mono.never(); Flux<Object> fn1 = Flux.never(); ReactorLogUtil.log("mn1" , mn1); ReactorLogUtil.log("fn1" , fn1); }
输出如下
1 2 3 -------------------- never() -------------------- <Mono> mn1 : <Timeout> <Flux> fn1 : <Timeout>
fromArray()/fromIterable()/fromStream() 从 数组/集合/Stream流 创建 Flux
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("fromXxx()" ); String[] names = {"张三" , "李四" , "王二麻子" }; Flux<String> ff1 = Flux.fromArray(names); ReactorLogUtil.log("ff1" , ff1); List<Integer> prices = List.of(37 ,18 ,22 ); Flux<Integer> ff2 = Flux.fromIterable(prices); Set<Integer> priceSet = Set.of(37 ,18 ,22 ); Flux<Integer> ff3 = Flux.fromIterable(priceSet); ReactorLogUtil.log("ff2" , ff2); ReactorLogUtil.log("ff3" , ff3); Stream<String> strStream = Stream.of("Tina" , "Lucy" , "Kimi" ); Flux<String> ff4 = Flux.fromStream(strStream); ReactorLogUtil.log("ff4" , ff4); }
输出如下
1 2 3 4 5 -------------------- fromXxx() -------------------- <Flux> ff1 : [张三, 李四, 王二麻子] <Flux> ff2 : [37 , 18 , 22 ] <Flux> ff3 : [22 , 37 , 18 ] <Flux> ff4 : [Tina, Lucy, Kimi]
range(start,count) 创建包含 [start, start+count) 范围下的count个元素的Flux
1 2 3 4 5 6 7 8 9 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("range()" ); Flux<Integer> fr1 = Flux.range(10 , 4 ); Flux<Integer> fr2 = Flux.range(3 , 0 ); Flux<Integer> fr3 = Flux.range(5 , 1 ); ReactorLogUtil.log("fr1" ,fr1); ReactorLogUtil.log("fr2" ,fr2); ReactorLogUtil.log("fr3" ,fr3); }
输出如下
1 2 3 4 -------------------- range() -------------------- <Flux> fr1 : [10 , 11 , 12 , 13 ] <Flux> fr2 : [] <Flux> fr3 : [5 ]
interval() 创建无限流Flux。其按指定间隔发出从0开始递增的元素
1 2 3 4 5 6 7 8 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("interval()" ); Flux<Long> fi1 = Flux.interval(Duration.ofSeconds(2 )) .take(4 ) .doOnNext(v-> System.out.println(LocalTime.now() + " -->> " +v)); ReactorLogUtil.log("fi1" ,fi1); }
输出如下
1 2 3 4 5 6 -------------------- interval() -------------------- 14 :25 :13.099309 -->> 0 14 :25 :15.097910 -->> 1 14 :25 :17.097782 -->> 2 14 :25 :19.096968 -->> 3 <Flux> fi1 : [0 , 1 , 2 , 3 ]
generate() 创建按指定逻辑生成、发出元素的Flux
generate(generator)版本:直接使用SynchronousSink的next()方法每次发出一个指定元素
1 2 3 4 5 6 7 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("generate()" ); Flux<Double> fg1 = Flux.generate( (SynchronousSink<Double> sink) -> sink.next( Math.random() ) ) .take(3 ); ReactorLogUtil.log("fg1" , fg1); }
输出如下
1 2 -------------------- generate() -------------------- <Flux> fg1 : [0.12924511843509945 , 0.3527570726338465 , 0.21543168691524528 ]
generate(stateSupplier, generator)版本:该版本支持状态管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("generate()" ); Flux<Long> fg2 = Flux.generate( () -> new long []{0 ,1 }, (long [] currentState, SynchronousSink<Long> sink) -> { sink.next( currentState[0 ] ); long newFirst = currentState[1 ]; long newSecond = currentState[0 ]+currentState[1 ]; long [] newState = new long []{newFirst, newSecond}; return newState; } ).take(10 ); ReactorLogUtil.log("fg2" , fg2); }
输出如下
1 2 -------------------- generate() -------------------- <Flux> fg2 : [0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 ]
delay() 创建一个Mono,其会在延迟指定时间后发出一个值为0的元素。常用作延时信号
1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("delay()" ); System.out.println(LocalTime.now() + ": pre" ); Mono<Long> m1 = Mono.delay(Duration.ofSeconds(20 )); System.out.println(LocalTime.now() + ": wait" ); ReactorLogUtil.log("m1" , m1); System.out.println(LocalTime.now() + ": complete" ); }
输出如下
1 2 3 4 5 -------------------- delay() -------------------- 14 :39 :23.458566 : pre14 :39 :23.645992 : wait<Mono> m1 : 0 14 :39 :43.672555 : complete
终端操作符 中间操作符都是惰性的。只有终端操作符才会触发整个响应式流的执行。订阅信号会从消费者反向传播到源头的发布者,启动数据流动
subscribe() subscribe(): 订阅流并启动它,但不对流发出的任何元素、错误、完成信号进行显式处理
1 2 3 4 5 6 7 8 9 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("subscribe()" ); Flux.just(37 ,25 ) .doOnNext(num-> System.out.println("onNext: " + num)) .subscribe(); Thread.sleep(1000 *3 ); }
输出如下
1 2 3 -------------------- subscribe() -------------------- onNext: 37 onNext: 25
subscribe(Consumer<? super T> consumer): 订阅流并启动它。使用指定的consumer对流发出的元素进行处理。由于没有提供错误消费者,故如果流发出错误,则错误会被传播到Reactor的全局错误处理机制
1 2 3 4 5 6 7 8 9 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("subscribe()" ); Flux.just("18" ,"Test" ,"24" ) .map( Integer::parseInt ) .subscribe( num-> System.out.println("num: " +num) ); Thread.sleep(1000 *3 ); }
输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 -------------------- subscribe() -------------------- num: 18 15 :16 :41.404 [main] ERROR reactor.core.publisher.Operators -- Operator called default onErrorDroppedreactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NumberFormatException: For input string: "Test" Caused by: java.lang.NumberFormatException: For input string: "Test" at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:67 ) at java.base/java.lang.Integer.parseInt(Integer.java:668 ) at java.base/java.lang.Integer.parseInt(Integer.java:786 ) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113 ) at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:171 ) at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96 ) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171 ) at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:119 ) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96 ) at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53 ) ...
subscribe(consumer, errorConsumer): 订阅流并启动它。使用consumer、errorConsumer分别对发出的元素、错误进行处理
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("subscribe()" ); Flux.just("18" ,"Test" ,"24" ) .map( Integer::parseInt ) .subscribe( v -> System.out.println("num: " +v), e -> System.out.println("happen e: " +e) ); Thread.sleep(1000 *3 ); }
输出如下
1 2 3 -------------------- subscribe() -------------------- num: 18 happen e: java.lang.NumberFormatException: For input string: "Test"
subscribe(consumer, errorConsumer, completeConsumer): 订阅流并启动它。使用consumer、errorConsumer、completeConsumer分别对发出的元素、错误、onComplete信号进行处理
1 2 3 4 5 6 7 8 9 10 11 12 13 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("subscribe()" ); Flux.just("18" ,"69" ,"24" ) .map( Integer::parseInt ) .subscribe( v -> System.out.println("num: " +v), e -> System.out.println("happen e: " +e), () -> System.out.println("done" ) ); Thread.sleep(1000 *3 ); }
输出如下
1 2 3 4 5 -------------------- subscribe() -------------------- num: 18 num: 69 num: 24 done
可通过subscribe()返回的Disposable对象的dispose方法主动取消订阅
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("subscribe()" ); Disposable subscribe = Flux.interval(Duration.ofSeconds(1 )) .log("#1" ) .subscribe(v -> System.out.println("v: " + v)); Thread.sleep(1000 *5 ); subscribe.dispose(); Thread.sleep(1000 *5 ); }
输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 -------------------- subscribe() -------------------- 15 :20 :38.986 [main] INFO #1 -- onSubscribe(FluxInterval.IntervalRunnable)15 :20 :38.990 [main] INFO #1 -- request(unbounded)15 :20 :40.000 [parallel-1 ] INFO #1 -- onNext(0 )v: 0 15 :20 :40.997 [parallel-1 ] INFO #1 -- onNext(1 )v: 1 15 :20 :41.997 [parallel-1 ] INFO #1 -- onNext(2 )v: 2 15 :20 :42.994 [parallel-1 ] INFO #1 -- onNext(3 )v: 3 15 :20 :43.995 [parallel-1 ] INFO #1 -- onNext(4 )v: 4 15 :20 :44.004 [main] INFO #1 -- cancel()
block() 阻塞当前线程来获取Mono的结果。生产中强烈不推荐使用该方式获取数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("block()" ); String name1 = Mono.just("Aaron" ) .block(); System.out.println("name1 : " + name1); Object name2 = Mono.empty() .block(); System.out.println("name2 : " + name2); Object name3 = null ; try { name3 = Mono.error(new IllegalArgumentException ("Error Name" )) .block(); }catch (Exception ex) { System.out.println("name3 Happen ex: " + ex); } }
输出如下
1 2 3 4 -------------------- block() -------------------- name1 : Aaron name2 : null name3 Happen ex: java.lang.IllegalArgumentException: Error Name
blockFirst()/blockLast() 阻塞当前线程来获取Flux的第一个元素/最后一个元素。生产中强烈不推荐使用该方式获取数据
1 2 3 4 5 6 7 8 9 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("blockFirst()/blockLast()" ); Integer num1 = Flux.just(11 ,22 ,33 ) .blockFirst(); Integer num2 = Flux.just(11 , 22 , 33 ) .blockLast(); System.out.println("num1: " + num1); System.out.println("num2: " + num2); }
输出如下
1 2 3 -------------------- blockFirst()/blockLast() -------------------- num1: 11 num2: 33
toStream() 将Flux转为Java Stream。toStream()本身并不会立即启动数据流,其依然是惰性的。只有当Stream的终端操作被调用时,整个流才会被启动,同时会阻塞当前线程。故生产中应尽量避免使用该方法
1 2 3 4 5 6 7 8 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("toStream()" ); List<Integer> nums = Flux.just(18 , 35 , 24 , 69 , 14 , 6 , 65 ) .toStream() .filter(num -> num > 30 ) .toList(); System.out.println("nums: " + nums); }
输出如下
1 2 -------------------- toStream() -------------------- nums: [35 , 69 , 65 ]
toIterable() 将Flux转为Java Iterable。toIterable()本身并不会立即启动数据流,其依然是惰性的。只有对返回的Iterable开始迭代时,整个流才会被启动,同时会阻塞当前线程。故生产中应尽量避免使用该方法
1 2 3 4 5 6 7 8 9 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("toIterable()" ); Iterable<Integer> iterable = Flux.just(18 , 35 , 24 ) .log("#1" ) .toIterable(); System.out.println("准备开始迭代..." ); iterable.forEach( num -> System.out.println("num: " + num)); }
输出如下
1 2 3 4 5 6 7 8 9 10 11 -------------------- toIterable() -------------------- 准备开始迭代... 15 :25 :49.381 [main] INFO #1 -- | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)15 :25 :49.384 [main] INFO #1 -- | request(256 )15 :25 :49.385 [main] INFO #1 -- | onNext(18 )15 :25 :49.385 [main] INFO #1 -- | onNext(35 )15 :25 :49.386 [main] INFO #1 -- | onNext(24 )15 :25 :49.386 [main] INFO #1 -- | onComplete()num: 18 num: 35 num: 24
toFuture() 将Mono转为Java CompletableFuture。其会立即启动数据流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static void main (String[] args) throws Exception{ ReactorLogUtil.separator("toFuture()" ); Mono<Integer> ageMono = Mono.just(69 ) .delayElement(Duration.ofSeconds(2 )) .log(); System.out.println(LocalTime.now()+ " #1" ); CompletableFuture<Integer> future1 = ageMono.toFuture(); System.out.println(LocalTime.now()+ " #2" ); future1.thenAccept( age-> System.out.println("age: " +age) ); Thread.sleep(1000 *5 ); Mono<Integer> m1 = Mono.just("Aaron" ) .map(Integer::parseInt); CompletableFuture<Integer> future2 = m1.toFuture(); future2.thenAccept( num-> System.out.println("num: " +num) ) .exceptionally( e-> { System.out.println("happen ex: " +e.getMessage()); return null ; }); Thread.sleep(1000 *3 ); }
输出如下
1 2 3 4 5 6 7 8 9 -------------------- toFuture() -------------------- 15 :30 :03.500899 #1 15 :30 :03.523 [main] INFO reactor.Mono.DelayElement.1 -- onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)15 :30 :03.528 [main] INFO reactor.Mono.DelayElement.1 -- request(unbounded)15 :30 :03.529683 #2 15 :30 :05.532 [parallel-1 ] INFO reactor.Mono.DelayElement.1 -- onNext(69 )age: 69 15 :30 :05.533 [parallel-1 ] INFO reactor.Mono.DelayElement.1 -- onComplete()happen ex: java.lang.NumberFormatException: For input string: "Aaron"