响应式编程、Reactor框架(Flux / Mono)

响应式编程 & Reactor框架(Flux / Mono)

(一)响应式编程模型

wiki:
响应式编程是关心数据流、变化传播的异步编程范式,这意味着它可以很容易地通过使用的编程语言表示静态(比如数组)、动态(比如事件发射器)的数据流。

简单来说,响应式编程就是利用异步数据流编程

  • 非响应式编程 接口:每次请求返回对象的个数是固定的
1
2
3
4
5
6
7
8
public interface OrderRepository {
List<Order> getOrders();
}

//如果返回Order数量过多,会导致OOM,所以优化为分页
public interface OrderRepository {
Page<Order> getOrders(Pagable page);
}
  • 响应式编程 接口:返回一个容器,由客户端选择它所需要的对象个数
1
2
3
public interface OrderRepository {
Flux<Order> getOrders(); //返回Flux容器
}

响应式编程中有3个核心概念,分别是:
流、背压、响应式流


1. 流

  • 流:简单来说就是由生产者生产并由一个或多个消费者消费的元素(Item)的序列

    即所谓的Pub-Sub模式(发布-订阅模式)

  • 流的处理模式:Pull(消费者从生产者主动拉取)/Push(生产者将元素推给消费者)

    非阻塞式背压采用的就是Pull模式

  • 流的处理:同步(如果元素不可用,消费者会一直阻塞;如果元素满了,生产者会阻塞)/异步(消费者请求元素后可以处理别的任务,当元素就绪生产者会将其异步发送给消费者)

流量控制

一般情况下,生产者发出数据的速度要远远大于消费者处理数据的速度,如果不做流量控制,消费者会被生产者快速产生的数据流淹没。

  1. 节流(Throttling):消费者直接丢弃无法处理的元素

  2. 缓冲区:使用无界缓冲区进行元素的保存和转发

  3. 调用栈阻塞(Callstack Blocking):同步线程

    也可以理解为阻塞式背压

  4. 背压:消费者需要多少,生产者就生产多少


2. 背压

  • 消费者在消费生产者生产的数据的同时,需要一种能够向上游反馈流量需求的机制,这种机制就叫做“背压”

实现方式

  1. 阻塞式背压:生产者和消费者在同一个线程运行,一方可以阻止另一方的运行
  2. 非阻塞式背压:采用Pull模式,消费者规定生产者生产多少消息量,并且最多只能发送这些量

3. 响应式流

接口

1. Publisher<T>

潜在的包含无限数量的有序元素的生产者,根据收到的请求向当前订阅者发送元素

1
2
3
4
public interface Publisher<T> {
//执行此方法会回调Subscriber的onSubscribe()方法,传入Subscription
public void subscribe(Subscriber<? super T> s);
}
2. Subscriber<T>

消费者从发布者订阅并接收元素。PublisherSubscriber发送订阅令牌(Subscription Token),Subcriber可以使用订阅令牌向Publisher请求多个元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface Subsciber<T> {

//回调函数,获取Subscription,用于操作Publisher
//通常会借助Subscription的request()获取n个数据
public void onSubscribe(Subscription s);

//不断回调onNext()发送<=n个数据
public void onNext(T t);

//发生错误时回调onError()
public void onError(Throwable t);

//数据发送完回调onComplete()
public void onComplete();
}

Subscriber接口定义的函数都是Callback回调函数

  1. 当执行Publishersubscribe()方法,会回调onSubscribe()方法,Subscriber通常会借助传入的Subscription对象向Publisher请求n个数据
  2. 生产者Publisher通过不断回调onNext()Subscriber发送<=n个数据
  3. 数据发送完,会回调onComplete()告诉Subscriber流已经发完
  4. 发生错误,会回调onError()告诉Subscriber发生了错误,并结束流
3. Subsciption
  • Subscriber订阅的一个Publisher的令牌。当订阅请求成功时,Publisher将其传递给SubsciberSubsciber通过Sbuscription令牌与Publisher进行交互 —— 比如请求更多元素、取消订阅等。
1
2
3
4
public interface Subscription{
public void request(long n); //请求更多元素
public void cancel(); //取消订阅
}

  • 当调用Publisersubscribe()方法注册Subscriber时,会通过Subsciber的回调方法onSubscribe()传入Subscription对象,之后Subsciber就可以通过这个Subscription对象的request()方法向Publisher请求数据。
4. Processor<T,R>

处理器Processor充当PublisherSubscriber之间的处理媒介,在Pub-Sub管道中充当转换器

1
public interface Processor<T,R> extends Subscriber<T>,Publisher<R> {}

(二)Reactor框架(Flux&Mono

Project ReactorSpring 5中响应式编程采用的默认框架,本质上是观察者模式

  • Reactor是为JVM设计的响应式NIO编程组件, 直接和Java 8函数式API集成(主要是CompletableFutureStreamDuration)。
  • Reactor提供了异步序列API:Flux(0 ~ n)和Mono(0 / 1)。
  • Reactor还通过reactor-netty项目支持了进程内部NIO通信,为了支持微服务,reactor-nettyHTTP/TCP/UDP提供了背压网络引擎。

1.异步操作的2种实现方式

(1)Callback

  • 优点:回调在任务执行过程不会造成任何阻塞
  • 缺点(Callback Hell):回调时代码会从一个类的某个方法跳到另一个类的回调方法,造成流程的不连续性,对于多层嵌套的异步组合而言显得十分笨拙,难以理解和维护

(2)Future

java.util.concurrent.Future接口
  • 将任务提交给FutureFuture就会在一定时间内完成这个任务,在这段时间我们可以去做其他事情
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface Future<V> {

//取消任务执行
boolean cancel(boolean mayInterruptIfRunning);

//判断任务是否已经取消
boolean isCancelled();

//判断任务是否已经完成
boolean isDone();

//等待任务执行结束,获取结果
V get() throws InterruptedException, ExecutionException;

//等待任务执行结束,获取结果,等待时间为timeout
V get(long timeout, TimeUnit unit) throws InterruptedException
, ExecutionException, TimeoutException;
}
缺点

Future没有提供通知机制,我们无法得知Future何时完成,只能:

  1. 调用两种阻塞的get()方法等待Future返回结果,相当于同步操作
  2. 使用isDone()不断轮询判断Future是否完成,这样非常消耗CPU资源

    JDK8引入了CompletableFuture,在一定程度上弥补了Future的缺点。

在异步任务完成后,我们使用任务结果时不需要等待,可以直接通过thenAccept()/thenApply()/thenCompose()等方法将前面异步处理的结果交给另一个异步事件处理线程处理


2.ReactorMaven依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>

3.Flux组件

参考《Flux官方文档》

onNext x 0..N [onError | onComplete]

  • Flux代表包含0 ~ n个元素的异步序列,onNext()/onComplete()/onError()都适用于Flux

创建Flux

(1) 通过静态方法创建Flux
  1. just()指定序列中包含的全部元素,创建出来的Flux序列会在发布这些元素后自动结束

    在已知元素数量和内容时,just()是创建Flux的最简单方式。

1
2
3
4
5
6
7
//用于创建已知元素个数和内容的Flux
//创建的Flux会在发布所有元素后自动结束
Flux.just("Hello", "World").subscribe(System.out::println);

//输出:
//Hello
//World
  1. fromArray()/fromIterable()/fromStream():如果已有一个数组/Iterable对象/Stream对象,可以通过fromArray()/fromIterable()/fromStream()从这些对象中自动创建Flux



1
2
3
4
5
6
7
//从已知数组中创建Flux
Flux.fromArray(new Integer[]{1,2,3}).subscribe(System.out::println);

//输出:
//1
//2
//3
  1. empty()/error()/never()empty()创建一个不包括任何元素只发布结束消息的序列;error()创建一个只包括错误消息的序列;never()创建一个不包括任何消息通知的序列



empty()never()的区别在于:
empty()包含一个onComplete()消息,而never()任何消息都不含

1
2
3
Flux.empty().subscribe(System.out::println);
//输出:
//
  1. range()range(int start, int count)方法可以创建包含从start起的count个对象的序列,序列中所有对象类型都是Integer

1
2
3
4
5
6
7
Flux.range(1,5).subscribe(System.out::println);
//输出:
//1
//2
//3
//4
//5
  1. interval()interval(Duration period)用于创建一个从0开始递增的包含Long对象的序列,序列中的元素按照指定的时间间隔来发布;interval(Duration delay, Duration period)除了可以指定时间间隔,还可以指定起始元素发布之前的延迟时间


(2) 动态创建Flux
  1. generate()依赖于SynchronousSink组件,同步、逐一产生Flux序列,包括next()/complete()/error(Throwable)三个核心方法,其中next()方法最多只允许调用一次

1
2
3
4
5
6
7
Flux.generate(sink -> {
sink.next("Hello");
sink.complete();
}).subscribe(System.out::println);

//输出:
//Hello
  1. create()异步、多线程创建

1
2
3
4
5
6
7
8
9
10
11
12
13
Flux.create(sink -> {
for(int i = 0; i < 5; i++) {
sink.next(i);
}
sink.complete();
}).subscribe(System.out::println);

//输出:
//0
//1
//2
//3
//4
  1. push()异步、单线程
1
2
3
4
5
6
7
Flux<String> bridge = Flux.push(sink -> {
myEventProcessor.register(
new SingleThreadEventListener<String>() {
//....
}
);
});

Demo

Hyxtrix的熔断函数

1
2
3
4
5
6
7
8
9
private Flux<Order> getOrdersFallback() {
List<Order> fallbackList = new ArrayList<>();
Order order = new Order();
order.setId("OrderInvalidId");
order.setItem("Order list is not available");
fallbackList.add(order);

return Flux.fromIterable(fallbackList);
}

4.Mono组件

参考《Mono官方文档》

onNext x 0..1 [onError | onComplete]

  • Mono代表包含0 / 1个元素的异步序列,onComplete()/onError()都适用于Mono
  • Mono<Void>也可以用于表示一个空的异步序列,该序列没有任何元素,仅仅包含序列结束的概念(类似于Java的Runnable

创建Mono

(1) 静态创建Mono

Mono的静态方法just()/empty()/error()/never()等都和Flux通用,不再赘述。

  1. delay()在指定延迟后,产生数字0作为唯一值

  2. justOrEmpty()justOrEmpty(Optional<? extends T> data)从一个Optional对象创建Mono,只有当Optional对象包含值时Mono序列才产生对应的元素;justOrEmpty(T data)从一个可能为null的对象中创建Mono,只有对象不为null时,Mono才产生对应的元素

1
2
3
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
//输出:
//Hellow
  1. then():从Mono创建Mono
  2. thenEmpty():返回Mono<Void>
  3. doOnNext(Consumer<? super T> onNext)Mono成功发射一个元素时就出发对应方法,返回新的Mono

(2) 动态创建Mono
  • create()通过MonoSink组件的success()方法创建Mono序列,异步、多线程

1
Mono.create(sink -> sink.success("Hello") ).subscribe(System.out::println);

Demo

Hyxtrix的熔断函数

1
2
3
4
5
6
7
private Mono<Order> getOrderFallback() {
Order order = new Order();
order.setId("OrderInvalidId");
order.setItem("Order list is not available");

return Mono.just(order);
}

5.Flux & Mono 相互转化

  1. 两个Mono序列合并就得到一个Flux序列:Mono.concatWith(Publisher)
  2. 对一个Flux序列进行计数操作,得到的就是Mono对象

6.Flux & Mono 操作符

参考 Reactor框架官方文档:《Reactor 3 Reference Guide》

(1) 转换(Transforming)操作符

转换操作符负责对序列中的元素进行转变

类似Java 8的Stream.map()/Stream.flatMap()

buffer:收集元素到集合
  • buffer操作符把当前流中的元素收集到集合中,并把集合对象作为流中的新元素
  • 使用buffer操作符在进行元素收集时可以指定集合对象所包含的元素的最大数量
1
2
3
4
5
6
7
8
//通过range(1.,20)创建1~20这20个元素
//通过buffer(10)从20个元素中构建集合,每个集合10个元素
//一共可以构建2个集合
Flux.range(1, 20).buffer(10).subscribe(System.out::println);

//输出:
//[1,2,3,4,5,6,7,8,9,10]
//[11,12,13,14,15,16,17,18,19,20]

此外还有:

  1. bufferTimeout(Duration timeout)指定收集的时间间隔
  2. bufferUntil()一直收集,直到断言(Predicate)返回true
1
2
3
4
5
6
7
8
9
10
//一直收集元素到集合,直到i为偶数开始下一次收集
Flux.range(1, 10).bufferUntil( i -> i % 2 == 0)
.subscribe(System.out::println);

//输出:
//[1,2]
//[3,4]
//[5,6]
//[7,8]
//[9,10]
  1. bufferWhile()直到断言(Predicate)返回true才开始收集,一旦值为false会立即开始下一次收集
1
2
3
4
5
6
7
8
9
10
//直到i为偶数才开始收集,一旦i为奇数就结束本次收集
Flux.range(1, 10).bufferWhile( i -> i % 2 == 0)
.subscribe(System.out::println);

//输出:
//[2]
//[4]
//[6]
//[8]
//[10]
map:映射
  • map操作符相当于一种映射操作,它对流中的每个元素应用一个映射函数,从而达到转换效果
flatMap:分别映射,然后合并
  • flatMap操作符将流中的每个元素转换成一个流,然后把转换之后得到的所有流中的元素进行合并
1
2
3
4
5
6
7
8
9
10
11
Flux.just(1,5)
.flatMap(x -> Mono.just(x * x))
.subscribe(System.out::println);
//输出:
//1
//25

//======================================
//对从DB中查询得到的元素进行逐一处理:
Mono<Void> deleteFiles = fileRepository.findByName(fileName)
.flatMap(fileRepository::delete);
window:收集元素到Flux
  • windowbuffer类似,不同的是window将当前流中的元素收集到另外的Flux中,因此返回值是Flux<Flux<T>>
1
2
3
4
5
6
7
8
9
10
11
12
13
Flux.range(1,5).window(2).toIterable().forEach(w -> {
w.subscribe(System.out::println);
System.out.println("---");
});

//输出:
//1
//2
//---
//3
//4
//---
//5

(2) 过滤(Filtering)操作符

过滤操作符负责将不需要的数据从序列中进行过滤

类似Java 8的Stream.filter()

filter:过滤
  • filter只留下满足过滤条件的元素
1
2
3
4
5
6
7
Flux.range(1,10).filter(i -> i % 2 == 0).subscribe(System.out::println);
//输出:
//2
//4
//6
//8
//10
first:返回第一个
last:返回最后一个
skip:忽略前n个元素
skipLast:忽略后n个元素
take:提取前n个元素
1
2
3
4
Flux.range(1,100).take(2).subscibe(System.out::println);
//输出:
//1
//2
takeLast:提取后n个元素
1
2
3
4
Flux.range(1,100).takeLast(2).subscibe(System.out::println);
//输出:
//99
//100

(3) 组合(Combining)操作符

组合操作符负责将序列中的元素进行合并和连接

then:等到上一个操作完成再做下一个
when:等到多个操作一起完成
1
2
3
4
5
6
7
8
9
public Mono<Void> updateFiles(Flux<FilePart> files) {
return files.flatMap(file -> {
Mono<Void> copyFileToFileServer = ...;
Mono<Void> saveFilePathToDB = ...;

//等到两个操作都完成才返回
return Mono.when(copyFileToFileServer, saveFilePathToDB);
});
}
startWith:头插
merge:合并多个流成一个Flux
  • merge合并得到的Flux元素顺序是先到先得的,即多个被合并的流,谁先生产元素,谁的元素就在Flux中排前面
1
2
3
4
5
6
7
8
9
10
11
12
//每过10ms产生一个新元素,先生产的元素在Flux中靠前
//interval是从0开始生产元素的
Flux.merge(Flux.intervalMillis(0, 10).take(3),
Flux.intervalMillis(5, 10).take(3)).toStram()
.forEach(System.out::println);
//输出:
//0
//0
//1
//1
//2
//2
  • 此外还有mergeSequentialmergeSequential不同于merge的地方在于它会先将第一个流中的元素全部放入Flux,才去拿第二个流
1
2
3
4
5
6
7
8
9
10
11
12
//每过10ms产生一个新元素,先排第一个流,然后才是第二个
//interval是从0开始生产元素的
Flux.mergeSequential(Flux.intervalMillis(0, 10).take(3),
Flux.intervalMillis(5, 10).take(3)).toStram()
.forEach(System.out::println);
//输出:
//0
//1
//2
//0
//1
//2
zipWith:将当前流与另一个流元素一对一合并
1
2
3
4
5
Flux.just("a", "b").zipWith(Flux.just("c", "d") )
.subscribe(System.out::println);
//输出:
//[a,c]
//[b,d]
  • 也可以传入一个BiFunction函数对合并的元素进行处理,得到的流的元素类型是该函数的返回值
1
2
3
4
5
6
Flux.just("a", "b").zipWith(Flux.just("c", "d"), (s1, s2) ->
String.format("%s+%s",s1,s2))
.subscribe(System.out::println);
//输出:
//a+c
//b+d

(4) 条件(Conditional)操作符

条件操作符负责根据特定条件对序列中的元素进行处理

类似Java 8的Optional

defaultIfEmpty:如果原始流为空,返回一个默认元素
1
2
3
4
5
6
7
//HTTP请求中,如果找不到指定数据,返回一个空对象和404状态码
@GetMapping("/article/{id}")
public Mono<ResponseEntity<Article>> findById(@PathVariable String id) {
return articleService.findOne(id)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.status(404).body(null));
}
takeUntil:一直提取元素直到Predicate返回true
1
2
3
4
5
Flux.range(1,100).takeUntil(i -> i == 3).subscribe(System.out::println);
//输出:
//1
//2
//3
takeWhile:直到Predicate返回true才开始提取元素
1
2
3
4
5
Flux.range(1,100).takeWhile(i -> i <= 99).subscribe(System.out::println);
//输出:
//1
//2
//3
skipUntil:丢弃元素直到Predicate返回true
skipWhile:直到Predicate返回true就丢弃元素

(5) 数学(Mathematical)操作符

数学操作符负责对序列中的元素执行各种数学操作

类似Java 8的数值流、Stream.reduce()

concat:顺序合并来自不同Flux的数据

类似于mergeSequential()

count:统计Flux元素个数
reduce:对流中元素累积(相加),得到一个包含结果的Mono序列
  • reduce()操作符传入一个BiFunction作为累积函数
1
2
3
Flux.range(1,10).reduce((x,y) -> x + y).subscribe(System.out::println);
//输出:
//55
  • reduceWith()reduce()相似,不同点在于reduceWith()可以指定初始值
1
2
3
Flux.range(1,10).reduceWith(() -> 5, (x,y) -> x + y).subscribe(System.out::println);
//输出:
//60

(6) Observable工具(Utility)操作符

Observable工具操作符提供的是一些针对流式处理的辅助性工具类

delay:将事件的传递向后延迟一段时间
subscribe:添加相关的订阅逻辑
  • 在调用subscribe()方法时可以指定需要处理的消息类型(正常消息/完成消息/异常消息)
1
2
3
4
5
Mono.just(100).concatWith(Mono.error(new IllegalStateException()))
.subscribe(System.out::println, System.err::println);
//输出:
//100
//java.lang.IllegalStateException
timeout:维持原始被观察者的状态,在特定时间内没有产生任何事件将生产一个异常
block:在接收到下一个元素之前一直阻塞
  • block常用于将响应式数据流转换为传统的数据流
1
2
3
4
public List<Order> getAllOrders() {
return orderService.getAllOrders()
.block(Duration.ofSecond(5));
}

(7) 日志和调试(Log & Debug)操作符

日志和调试操作符提供了针对运行时日志以及如何对序列进行代码调试的工具类

log:观察所有数据并使用日志工具进行跟踪
debug:调试工具

7.Reactor框架背压机制

(1)onBackpressureBuffer:缓存下游没有来得及处理的数据

  • 需要注意的是,如果缓存不设置大小,可能导致OOM

(2)onBackpressureDrop:在下游没有准备好时抛弃元素

(3)onBackpressureLatest:让下游只得到上游最新元素

(4)onBackpressureError:下游跟不上节奏时发送错误信号

-------------本文结束感谢您的阅读-------------

本文标题:响应式编程、Reactor框架(Flux / Mono)

文章作者:DragonBaby308

发布时间:2019年09月26日 - 20:37

最后更新:2019年11月19日 - 00:29

原始链接:http://www.dragonbaby308.com/reactor/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

急事可以使用右下角的DaoVoice,我绑定了微信会立即回复,否则还是推荐Valine留言喔( ఠൠఠ )ノ
0%