反应式流 Reactive Streams 入门介绍

反应式流 Reactive Streams 入门介绍
最新回答
王牌冤家

2022-09-19 04:11:55

反应式流 Reactive Streams 入门介绍

Reactive Streams,翻译为反应式流,是一种针对异步数据流处理的编程范式和规范。下面将从起源、概念、目标、一些思考、与Java版本的关系、具体实现框架以及小结等方面对其进行详细介绍。

1. 起源

Reactive Streams并不是全新的事物,而是为了解决异步编程过程中出现的种种难题而提出的。异步编程时,存在诸如超时、异常处理困难、难以重构以及多个异步任务协同处理等难题。为了解决这些问题,人们提出了反应式编程(Reactive Programming)这一编程范式。反应式编程本质上是对数据流或某种变化所作出的反应,这个变化什么时候发生是未知的,因此它基于异步、回调的方式处理问题。

2013年底,Netflix、Pivotal和Lightbend中的工程师们启动了Reactive Streams项目,希望为异步流(包含背压)处理提供标准。

2. 概念

对于Java程序员来说,Reactive Streams是一个API,它提供了Java中反应式编程的通用API。Reactive Streams非常类似于JPA或JDBC,都是API规范,实际使用时需要使用API对应的具体实现。

Reactive Streams API的范围是找到一组最小的接口、方法和协议,这些接口、方法和协议将描述必要的操作和实体,从而实现具有非阻塞背压的异步数据流。从代码结构上看,它主要包含两部分:reactive-streams和reactive-streams-tck。其中TCK意为技术兼容包(Technology Compatibility Kit),为实现Reactive Streams接口提供帮助。

Reactive Streams API中仅仅包含了如下四个接口:

  • Publisher:发布者,负责发布数据流。
  • Subscriber:订阅者,负责订阅数据流并接收数据。
  • Subscription:表示Subscriber消费Publisher发布的一个消息的生命周期,包括请求数据和取消订阅等操作。
  • Processor:处理器,表示一个处理阶段,它既是订阅者也是发布者,并且遵守两者的契约。

3. 目标

Reactive Streams的主要目标有两个:

  • 管理跨异步边界的流数据交换,即将元素传递到另一个线程或线程池。
  • 确保接收方不会强制缓冲任意数量的数据,为了使线程之间的队列有界,引入了回压(Back Pressure)。

传统异步编程的写法中,不同任务分别在不同的线程中执行,协调这些线程执行的先后顺序、线程间的依赖顺序是一件非常麻烦的事情。而Reactive Streams就是为了解决该问题,它提供了统一的规范来管理异步数据流。

另外,Reactive Streams规范引入了回压(Back Pressure),可以动态控制线程间消息交换的速率,避免生产者产生过多的消息,消费者消费不完等类似问题。

4. 一些思考

  • Reactive:这是个形容词,翻译为“反应的”。Reactive Streams是基于消息驱动的(也可以说是事件驱动的),当消息产生时,系统被动接受消息,并作出反馈,而非主动处理。因此,我们也可以这样理解:被动地接收消息后,作出相应的反应动作,这个行为称之为“反应式”。

  • Streams:这是个名词,翻译为“数据流”。反应式编程的核心思想体现在了这个单词上。流的定义是随着时间顺序排列的一组序列。一切皆是流(Everything is a stream)。我们可以把一组数据抽象为流(可以想象流是一个数组),把对流中节点的逻辑处理抽象成对节点的一步一步的处理,围绕该节点做加工处理,最终获得结果。

  • 非阻塞、异步:反义词是阻塞、同步。目前在Java中,大多数应用程序是同步的,即暴力创建线程,线程阻塞时一直等待直到有结果返回。异步最吸引人的地方在于资源的充分利用,不把资源浪费在等待的时间上,代价是增加了程序的复杂度。而Reactive Streams封装了这些复杂性,使其变得简单。

  • 背压(back-pressure):背压是从流体动力学中借用的类比,在维基百科的定义是抵抗所需流体通过管道的阻力或力。在软件环境中,可以调整定义为通过软件抵抗所需数据流的阻力或力量。背压是为了解决上游组件产生过量的消息,导致下游组件无法及时处理的问题。通过背压机制,下游组件可以向上游组件传达其正在遭受压力的事实,并让它们降低负载。

5. 与Java1.8、Java1.9的关系

Reactive Streams不要求必须使用Java8,Reactive Streams也不是Java API的一部分。但是使用Java8中的lambda表达式可以发挥Reactive Streams规范的强大特性。例如,Reactive Streams的实现Project Reactor项目的当前版本就要求最低使用Java1.8。

Java8中的Stream和Reactive Streams都使用了流式处理的思想,围绕数据流处理数据,即完成了从命令式到声明式的转换,使数据处理更方便。不同的地方在于,Java8中的Stream是同步的、阻塞的,而Reactive Streams是异步的、非阻塞的。

当使用Java1.9时,Reactive Streams已成为官方Java 9 API的一部分,Java9中Flow类下的内容与Reactive Streams完全一致。

6. 具体实现框架

Reactive Streams的实现现在比较多了,以下是一些常见的实现框架:

  • RxJava:RxJava是ReactiveX项目中的Java实现。RxJava早于Reactive Streams规范,但RxJava 2.0+确实实现了Reactive Streams API规范。

  • Reactor:Reactor是Pivotal提供的Java实现,它作为Spring Framework 5的重要组成部分,是WebFlux采用的默认反应式框架。

  • Akka Streams:Akka Streams完全实现了Reactive Streams规范,但Akka Streams API与Reactive Streams API完全分离。

  • Ratpack:Ratpack是一组用于构建现代高性能HTTP应用程序的Java库。Ratpack使用Java 8、Netty和Reactive原则。可以将RxJava或Reactor与Ratpack一起使用。

  • Vert.x:Vert.x是一个Eclipse Foundation项目,它是JVM的多语言事件驱动的应用程序框架。Vert.x中的反应支持与Ratpack类似。Vert.x允许我们使用RxJava或其Reactive Streams API的实现。

7. 小结

在Reactive Streams之前,各种反应库无法实现互操作性。例如,早期版本的RxJava与Project Reactor的早期版本不兼容。另外,反应式编程无法大规模普及的一个重要原因是并不是所有库都支持反应式编程。

Reactive Streams的推出统一了反应式编程的规范,并且已经被Java9集成。由此,不同的库可以互操作了,互操作性是一个重要的多米诺骨牌。例如,MongoDB实现了Reactive Streams驱动程序后,我们可以使用Reactor或RxJava来使用MongoDB中的数据。

综上所述,Reactive Streams为Java中的异步数据流处理提供了统一的规范和API,通过引入非阻塞背压机制等特性,解决了传统异步编程中的诸多难题。随着越来越多的库和框架支持Reactive Streams,反应式编程将在未来得到更广泛的应用和发展。