Streaming 102:批处理之外的流式世界第二部分

1. 引言

欢迎回来!如果你错过了我之前的博文:Streaming 101:批处理之外的流式世界第一部分,我强烈建议你先花时间阅读这篇文章。在这篇文章介绍的内容是下面介绍内容的基础,并且当你阅读这篇文章时,我假设你已经熟悉第一篇文章中介绍的术语和概念了(有些东西在这篇文章不会详细介绍)。现在我们进入正题。先简要回顾一下,上篇文章我主要关注的三个方面:

  • 术语:当使用流等多语义术语时,明确了我要表达的意思;
  • 批与流的比较:比较这两种系统的理论能力,并提出流处理系统超越批处理系统只需要两件事:正确性和时间推理工具;
  • 数据处理模式:介绍批处理和流处理系统处理有限和无限数据时所采用的方法。

在这篇文章中,我接着上次进一步介绍数据处理模式,但这次借助具体示例来更详细的介绍。这篇文章主要分为两个章节:

  • Streaming 101 精简版:简要回顾 Streaming 101 中引入的概念,并借助具体示例来突出一下重点。
  • Streaming 102:Streaming 101 的姊妹篇,详细介绍处理无限数据集的一些其他重要概念,并通过一些具体示例来进行解释。

当我们读完这篇文章时,我们会学习到一个具有鲁棒性的乱序数据处理所需的核心原则和概念以及可以实现超越经典批处理系统的时间推理工具。为了让你有直观的感受,我会使用 Dataflow SDK 代码(即 Google Cloud Dataflow 的 API),并结合动画来表达一些概念。使用 Dataflow SDK 而不是你可能更熟悉的 Spark Streaming 或 Storm 的原因是,目前几乎没有其他系统可以提供我想要表达所有示例的能力。好消息是其他项目也开始朝这个方向发展。更好的消息是,我们(谷歌)今天向 Apache 软件基金会提交了一份提案,来创建一个 Apache Dataflow 孵化器项目(与 data Artisans、Cloudera、Talend 和其他一些公司合作),希望围绕数据流模型提供的强大的乱序处理语义建立一个开放的社区和生态系统。

很抱歉,这篇文章中缺少了我上次承诺的比较部分。我低估了这篇文章中包含的内容以及需要完成的时间。我不想为了完成这个一部分再看到时间上的延迟以及再做一些其他扩展。如果有什么安慰的话,我在 Strata + Hadoop World Singapore 2015 上做了一个 The evolution of massive-scale data processing 演讲(并将在 6 月的 Strata + Hadoop World London 2016 上给出它的更新版本),这里面会提供缺失的比较部分的资料;同时,会奉上一个精美的幻灯片

2. 总结和路线图

Streaming 101中,我们首先明确了一些术语。我们先区分了有限数据和无限数据。有限数据源的大小是有限的,通常被称为 ‘batch’ 批数据。无限数据源一般大小都是无限的,并且通常被称为 ‘streaming’ 流数据。我会尽量避免使用批和流术语来指代数据源,因为这些名称会让我们产生误解。然后,我们继续定义了批处理引擎和流处理引擎之间的区别:批处理引擎是那些仅为有限数据设计的引擎,而流处理引擎在设计时考虑到了无限数据。我的目标是只在谈及执行引擎时才使用批和流这样的术语。

在介绍完术语之后,我介绍了两个与处理无限数据有关的重要概念。首先明确了事件时间(事件发生的时间)和处理时间(处理期间观察到时间)之间的重要区别。这为Streaming 101提出的主要论点之一提供了基础:如果你关心正确性和事件实际发生的上下文,那么必须根据事件固有的事件时间来分析数据,而不是用它们在分析过程中的处理时间。最后我介绍了窗口的概念(即,将数据集按时间边界划分),这是处理无限数据源的一种常见方法。窗口策略中比较简单的是固定窗口的和滑动窗口,也还有更复杂的窗口类型,例如会话窗口(窗口由数据本身的特征定义,例如捕获每个用户的活动会话)。

除了这两个概念之外,我们还需要仔细研究一下如下三个概念:

  • Watermarks:衡量事件时间输入完整性的一个概念。时间为 x 的 Watermark 表示事件时间小于 X 的所有输入数据都已经到达。因此,当不知道无限数据源什么时候结束时,Watermark 就用作进度的度量。
  • Triggers:触发器是一种声明窗口何时触发计算输出的机制(响应某个外部信号)。触发器在选择什么时候发送输出时提供了一定的灵活性。这为窗口演变时多次观察窗口输出提供了可能。反过来,也为随着时间的推移修正结果提供了可能,我们可以在数据到达时提供一个推测结果,然后处理随着时间变化的上游数据以及 Watermark 之后的迟到数据(修正结果)。(比如手机传感器,当用户离线时,他们记录了手机各种各样的信息,而当用户重新连线时把这些离线时收集的数据继续传送上报)。
  • Accumulation:累积模式指定了在同一窗口中观察到的多个结果之间的关系。这些结果可能完全不相关,只是随着时间推移各自的增量,或者它们之间可能存在重叠。不同的累积模式具有不同的语义和与之相关的成本,因此需要从多种使用用例中寻找合适的模式。

最后,为了更好的理解这些概念之间的关系,我们可以在回答下面四个问题的过程中温故知新,这些问题对于无限数据处理来说是至关重要的:

  • What:计算逻辑是什么?这个问题的答案是 Pipeline 的 transforms。这其中包括计算总和、构造直方图以及机器学习模型训练等。同样在经典批处理中也是如此。
  • Where:计算的事件时间范围?这个问题的答案是 Pipeline 的事件时间窗口。这包括Streaming 101中介绍的常见窗口(固定,滑动和会话窗口),没有窗口概念的用例(例如,Streaming 101中描述的与时间无关的处理;经典的批处理也通常属于这种类别)以及其他更复杂的窗口类型。还要注意,如果将记录到达系统时的摄入时间作为事件时间时,也可以包含处理时间窗口。
  • When:什么时候(处理时间)输出结果?这个问题的答案是 Watermark 和 Triggers。最常见的模式是使用 Watermark 来描述给定窗口的输入是否完成,使用 Triggers 指定是否输出提前结果(在窗口完成之前发送推测结果)和迟到结果(Watermark 仅是对完整性的评估,在 Watermark 声明给定窗口的输入完成之后可能还会有其他的输入数据)。
  • How:如何修正相关结果?这个问题的答案是所使用的 Accumulation 类型:丢弃(结果都是独立且不同的),累积(后来的结果建立在先前的结果之上)以及累积并撤回(输出累积值和先前已被触发值的撤回)。

3. Streaming 101 总结

首先,让我们回顾一下在Streaming 101中提出的一些概念,但是这次会提供一些详细的例子,这些例子将有助于我们更好的理解这些概念。

3.1 What: transformations

经典的批处理应用中的 transformations 回答了第一个问题:计算逻辑是什么?尽管你们可能对经典的批处理已经很熟悉了,但是我们还是从这里开始,因为它是我们添加所有其他的概念的基础。

在本节中,我们会看到一个简单的例子:在由 10 个值组成的简单数据集上分 Key 计算 SUM。对于每个示例,我都会提供一个 Dataflow Java SDK 伪代码的简短片段。从某种意义上说,这是伪代码,有时我会略作修改以使示例更清晰、也会省略一些细节(比如使用具体的I/O源)以及简化名称(Java 中当前的触发器名称非常冗长;为了清晰,我将使用更简单的名称)。除了诸如此类的小事之外,基本上都是真实的 Dataflow SDK 代码。后面我还会为那些对类似例子(可以编译和运行)感兴趣的人提供一个实际代码演练的链接。

如果你熟悉 Spark Streaming 或 Flink 等类似的工具,那么很容易理解 Dataflow 的代码。Dataflow 中有两个基本原语:

  • PCollections:表示可以执行并行转换操作的数据集(可能是大数据集)(因此名称以 p 开头)。
  • PTransforms:可以应用在 PCollections 上创建新的 PCollections。PTransforms 逐元素执行转换,可以将多个元素聚合在一起,也可以跟其他 PTransforms 组合操作。

图1

就我们的例子而言,我们假定从名为 ‘input’ 的 PCollection<KV<String,Integer>> (PCollection 由 Strings 和 Integer 的键/值对组成,其中 Strings 像是团队名称,Integer 则是对应每个团队成员分数)开始。在现实世界的 Pipeline 中,我们从来自 I/O 数据源的原始数据(例如,日志记录) PCollection 来获取输入,然后将日志记录解析为键/值对,并转换为 PCollection< KV<String,Integer>>。在为了第一个例子更加清晰,我会包含所有步骤的伪代码,但在随后的例子中,我会忽略 I/O 和解析部分。因此,简单地从 I/O 源读取数据,解析出团队/分数,并计算每个团队总分数的 Pipeline 如下所示:

// 代码1
PCollection<String> raw = IO.read(...);
PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn());
PCollection<KV<String, Integer>> scores = input.apply(Sum.integersPerKey());

上述代码从 I/O 数据源读取键/值对数据,其中 Key 是 String 类型的团队名称,Value 是 Integer 类型的团队每个成员分数。然后根据 Key 分组求和产生每个团队的总分数。

对于接下来的所有例子,我们都会先看 Pipeline 的代码,然后再用动画展示一个数据集的运行过程。更具体地说,我们会看到一个 Key 的 10 条输入数据的执行过程。在一个真正的 Pipeline 中,你可以想象类似的操作会在多台机器上并行执行,但是在这里会尽量简化。每个动画都有两个维度:X 轴上的事件时间和 Y 轴上处理时间。白色的粗线条从底部向上移动表示 Pipeline 的实时进度,圆圈表示输入,圆圈内的数字表示记录的值。当观察到它们时,它们由之前的灰色变成白色,同时将它们累加在状态中,并最终将聚合结果输出。状态和输出由矩形表示,靠近顶部展示一个聚合值,矩形覆盖的区域表示部分事件时间和处理时间已经累加到结果中。对于上述代码中的 Pipeline,在经典的批处理引擎上执行时看起来就像下面一样:

图2

由于这是一个批处理 Pipeline,因此会累积状态,直到所有输入完成(到达顶部的绿色虚线时表示看到所有的输入),此时产生最终的结果为 51。在这个示例中,因为我们没有使用任何窗口转换操作,所以我们是在所有事件时间上计算的总和,因此状态和输出的矩形覆盖了整个 X 轴。如果我们处理的是一个无限数据源,那么经典的批处理是不够的。我们不能等待所有输入结束,因为输入永远不会结束。在这里我们需要使用窗口这个概念,上篇文章中已经介绍了这个概念。因此,在回答第二个问题之前:计算的事件时间范围?,先我们简要回顾一下窗口这个概念。

3.2 Where: windowing

正如上次讨论的那样,窗口是沿着时间边界分割数据源的过程。常见的窗口策略有固定窗口、滑动窗口和会话窗口:

图3

为了更好地在实践中理解在窗口,我们以 2 分钟的固定窗口求和为例。使用 Dataflow SDK 比较简单,添加 Window.into 转换操作即可:

// 代码2
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))))
.apply(Sum.integersPerKey());

Dataflow 提供了一个适用于批处理和流式传输的统一模型(语义上批处理只是流式处理的一种特殊情况)。因此,我们先在批处理引擎上执行这个 Pipeline;机制比较简单,可以与切换到的流处理引擎直接进行比较。

图4

和以前一样,输入在状态中累积,直到所有输入完成,最后才输出最终结果。在这种情况下,我们会得到四个输出:四个两分钟事件时间窗口都有对应的一个输出。

到目前,我们重新回顾了 Streaming 101 中引入的两个概念:事件时间和处理时间之间的关系以及窗口。如果我们想继续探讨,我们需要了解本节涉及的新概念:Watermark、Triggers 和累积模式。

4. Streaming 102

我们刚才观察到的是在批处理引擎上执行一个窗口的 Pipeline。但理想情况下,我们希望结果具有较低的延迟。切换到流式引擎是才是正确的方向,对于批处理引擎我们都知道,每个窗口的输入都是完整的(即一旦有限输入源中的所有数据都已被消费),但是对于无限数据源,我们目前缺乏确定其完整性的实际方法。因此引入了 Watermarks。

4.1 When: watermarks

Watermark 回答了 “什么时候(处理时间)输出结果?” 的前半部分。Watermark 是一个事件时间概念,用来衡量输入数据的完整性。换一种说法,它是系统以事件时间为尺度衡量记录在事件流中处理的进度和完整性的方法(无论是有限还是无限数据,尽管在无限数据中作用更明显)。回想一下 Streaming 101 中这张图(在这里稍作了修改),对于大多数现实世界分布式数据处理系统,事件时间和处理时间之间的偏差可以用随时间不断变化的函数表示。

图5

上图中那条弯弯曲曲的红线实际上就是 Watermark,可以随着处理时间的推移能够获取事件时间完整性的进度。从概念上讲,可以将 Watermark 看作是一个函数 F(P) -> E,输入一个处理时间点输出一个事件时间点。(更准确地说,函数的输入实际上是 Pipeline 中观察到 Watermark 时间点上游所有的当前状态:输入源,缓冲的数据,正在处理的数据等;但从概念上讲,可以简单的理解为处理时间到事件时间的映射)。事件时间点 E 表示事件时间小于 E 的那些输入数据都已经被看到了。换句话说,我们断言不会再看到事件时间小于 E 的其他数据了。根据 Watermark 的类型,完美 Watermark 和启发式 Watermark 会分别提供严格保证和有依据的猜测:

  • 完美 Watermarks:在对所有输入数据充分了解的情况下(了解数据的最大延迟等),可以构建完美 Watermarks;在这种情况下,没有迟到数据(late),所有数据都是提前(early)或者准时到达(on-time)。
  • 启发式 Watermarks:对于许多分布式输入源,充分了解输入数据是不切实际的,在这种情况下,最佳选择是启发式 Watermarks。启发式 Watermarks 充分利用任何可以获取到的输入信息(分区,分区内的排序(如果有的话),文件的增长率等)来提供尽可能准确的进度估计。在许多情况下,这种 Watermark 预测中是非常准确的,但是使用这种 Watermark 意味着有时可能是错的,这会导致迟到数据。我们会在下面的触发器部分中了解如何处理迟到数据。

Watermark 是一个非常吸引人且复杂的话题,但是它不是这篇文章讨论的重点,以后有机会专门写一篇文章来介绍。现在,为了更好地理解 Watermark 的作用和缺点,我们来看看两个使用 Watermark 流引擎的例子,以确定在执上述代码的窗口化 Pipeline 时何时输出结果。左边的例子使用的是完美 Watermarks,右边使用的是启发式 Watermarks。

图6

在这两种情况下,当 Watermark 到达窗口结尾时,窗口就会被触发计算。两次执行的主要区别在于右侧的 Watermark 计算使用的是启发式算法没有考虑到 9 这个值,这很大程度上改变了 Watermark 的形状。这两个例子突出了 Watermark (以及其他的完整性概念)的两个缺点:

  • Watermark 太慢:当任何类型的 Watermark 由于知道还有未处理的数据(例如,由于网络带宽限制而缓慢增长的输入日志)明确地延迟时,并且结果的计算需要依赖 Watermark 的推进,那么效果直接转换为输出中的延迟。这在左图中体现最明显,迟到到达的数据 9 卡住了所有后续窗口的 Watermark,就算这些窗口的输入数据早已经到达(也无法触发计算输出结果)。这在第二个窗口 [12:02,12:04] 中尤其明显,从窗口中第一个值到达到我们看到窗口输出结果花费了将近7分钟。启发式 Watermarks 表现的没有那么严重(花费了5分钟),但不要认为启发式 Watermarks 不会受到 Watermark 延迟的影响;这个例子的效果与我选择的记录有关系(没有体现出来罢了)。需要注意的是:虽然 Watermark 提供了非常有用的完整性概念,但是从延迟角度来看,根据完整性生成输出往往不是很理想。设想一个仪表板,按小时或天划分窗口。你不太可能要等一小时或一天才能看到当前窗口的结果;这也是使用经典批处理系统为此类系统的痛点之一。我们更希望可以随着时间推移看到窗口的结果发生变化,最终产生完整地结果。
  • Watermark 太快:当启发式 Watermarks 错误的比原本的提前了,事件时间在 Watermark 之前的数据可能会延迟到达,从而产生迟到数据。在右边的例子就出现了这样的情况:在观察到该窗口的所有输入数据到达之前,Watermark 就提前到达了第一个窗口的结尾,导致错误的输出值 5 而不是 14。这是启发式 Watermarks 的一个严重问题,这种启发特性意味着它们有时会出错。因此,如果你关心正确性,只依靠它们来确定何时输出是不够的。

在 Streaming 101 中,我就强调完整性不足以解决无限数据流的乱序问题。Watermark 太慢和太快这两个缺点,是这个论点的理论依据。你不能寄希望系统只依赖完整性就能获得低延迟和正确性。触发器就是为了解决这些缺点而引入的。

4.2 When: Triggers

触发器(Triggers)回答了 “什么时候(处理时间)输出结果?” 的后半部分。触发器决定了窗口在处理时间上什么时候输出(尽管触发器本身可以根据其他时间概念作出上述决策,例如基于事件时间的 Watermark 处理)。窗口的每个特定输出都称为窗口的窗格(pane)。有几种触发的信号,如下所示:

  • Watermark 的进度(即事件时间进度):上图 6 中已经隐含了触发器,当 Watermark 到达窗口末尾时输出被触发。另一个用例是当一个窗口的生命周期结束时会触发垃圾回收,我们稍后会看到这个例子。
  • 处理时间进度:对于提供有规律与周期性的更新非常有用,因为处理时间(不像事件时间)均匀的运行,不会出现延迟。
  • 元素个数:当一定数量的元素到达窗口时会触发。
  • 特殊标记:当遇到指定记录或者具有某些特征的记录(例如,EOF元素或刷新事件)时窗口触发。

除了基于具体信号触发的简单触发器之外,还有复合触发器,可以允许创建更复杂的触发逻辑。复合触发器如下所示:

  • 重复触发器:与处理时间触发器配合使用,在提供有规律与周期性更新的场景下特别有用。
  • 逻辑与触发器(AND):只有当所有子触发器触发时(例如,在 Watermark 到达窗口结尾并且观察到终止标点符记录之后)才触发。
  • 逻辑或触发器(OR):任何一个子触发器触发时(例如,在 Watermark 到达窗口结尾或者观察到终止标点符记录之后)才触发。
  • 顺序触发器:按照预先定义的顺序触发一系列的子触发器(后一个子触发器必须等待前一个触发器触发)。

为了更具体的了解触发器,我们将上述代码 2 中的隐式触发器显示添加到代码中:

// 代码3
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(AtWatermark()))
.apply(Sum.integersPerKey());

考虑到我们已经对触发器有了基本了解,现在我们可以考虑解决 Watermark 太慢以及太快的问题。在这两种情况下,我们希望为窗口提供某种周期性的更新,无论是 Watermark 到达窗口结尾之前还是之后(不仅仅是在 Watermark 通过窗口的结尾时触发更新)。所以,我们需要某种重复触发器。那么问题就变成了:重复触发的条件是什么?

在太慢的情况下:由于输出太晚,所以要提早提供推测结果。我们假定任何窗口都有稳定的输入数据量,我们知道窗口输入是不完整的,还有数据尚未到达。因此,按照处理时间周期性(例如,每分钟一次)触发可能是一种明智的做法。因为触发器触发的次数不会取决于窗口内观察到的实际数据量,在最坏的情况下,也就是源源不断的周期性触发。

在太快的情况下:使用启发式 Watermarks 会出现迟到数据,所以当出现迟到数据时需要修正结果。假设我们的 Watermark 基于相对准确的启发式算法(通常是合理安全的假设)。在这种情况下,不会有过多的迟到数据,但是当看到迟到数据时,需要快速修正我们的结果。只要看到一个迟到元素时就要立即触发更新。考虑到这种迟到数据不会太多,不会对我们系统的负载产生太大影响。请注意,这里只是举了一个例子,你可以根据自己的情况自由选择触发的条件(比如只在上面某一种情况下触发,或者都不触发)。

最后,我们需要协调各种触发的时机:提前、准时或者迟到。我们可以通过顺序触发器和一个特殊的 OrFinally 触发器来完成这个工作,OrFinally 触发器有一个子触发器,当子触发器触发时会终止父触发器。

// 代码4
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(
Sequence(
Repeat(AtPeriod(Duration.standardMinutes(1))).OrFinally(AtWatermark()),
Repeat(AtCount(1))
)
)
.apply(Sum.integersPerKey());

但是,这种写法很罗嗦。考虑到 repeated-early | on-time | repeated-late 触发模式非常常见,所以我们在 Dataflow 中提供了一个自定义的(但是语义上相同的)API,使得指定这样的触发器更简单清晰:

// 代码5
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(
AtWatermark()
.withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
.withLateFirings(AtCount(1))))
.apply(Sum.integersPerKey());

在流式引擎上执行上述两段代码(与之前一样使用完美 Watermarks 和启发式 watermarks),然后生成如下所示的结果:

图 7

这个版本比图 6 有两个明显的改进:

  • 对于第二个窗口 [12:02, 12:04) 中的’Watermark 太慢’的情况:我们现在每分钟提供一次周期性提前更新。在完美 Watermark 情况下,差异最为明显,首次输出时间从接近七分钟减少到三分半。在启发式 Watermark 场景下也得到了明显的改进。这两个版本随着时间的推移都不断的修正结果(值分别为 7、14 和 22 的窗格),在输入完成和窗口的最终输出之间提供了相对最小的延迟。
  • 对于第一个窗口 [12:00, 12:02) 中的’启发式 Watermark 太快’的情况:当迟到数据 9 到达时,我们立即更新输出结果,输出了正确的窗格值 14。

这些新触发器的让人兴奋的作用是有效地统一了完美和启发式 Watermark 版本之间的输出模式。虽然图 6 中的两个版本截然不同,但这里的两个版本看起来已经非常相似了。此时剩下最大的差异是窗口生命周期。在完美 Watermark 情况下,我们知道一旦 Watermark 到达了窗口结尾,我们再也不会看到窗口的其他任何数据,因此我们可以在这个时候删除窗口的所有状态。但在启发式 Watermark 的情况下,我们仍然需要保留一段时间的窗口状态,以解决迟到数据。但是到目前为止,我们的系统还没有任何好的方法可以知道每个窗口的状态需要保留多长时间。这里我们需要引入’可允许的迟到’这个概念。

4.3 When: allowed lateness

在进入最后一个问题’如何修正相关结果?’之前,我们先讨论处理长期无序数据数据流系统必备的一个功能:垃圾回收。图 7 的启发式 Watermark 例子中,窗口的状态在该示例的整个生命周期内都会保存。为了处理迟到数据,这么做是有必要的。虽然将我们所有的持久状态保存到时间结束时是一件很棒的事情,但是,在实际环境中,当处理无限数据源时,无限期地保存窗口状态(包括元数据)是不切实际的,最终会耗尽磁盘空间。

因此,任何现实世界的无序处理系统都需要提供某种方法来限制窗口的生命周期。一种简单的方法是在系统内定义可允许的迟到时间范围,即需要对记录可能迟到多长时间(相对于 Watermark)设置一个上限,以便系统进行处理;在此范围之后到达的任何数据都会被简单地丢弃。一旦确定了单条数据的迟到时间,你就能精确地确定窗口的状态必须保存多长时间:直到 Watermark 超过窗口结束后的可迟到时间范围。此外,还允许系统立即删除观察到任何晚于可迟到时间范围内的数据,这意味着系统不会浪费资源处理没人关心的数据。

由于可允许的迟到时间范围与 Watermark 之间的交互有点微妙,所以我们需要看一个例子。我们在代码 5 中添加一分钟的可允许的迟到时间范围(请注意,这里选择这个迟到时间范围是因为它比较适合图表展示,但在实际用例中,迟到时间范围可能会有更大):

// 代码6
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(
AtWatermark()
.withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
.withLateFirings(AtCount(1)))
.withAllowedLateness(Duration.standardMinutes(1)))
.apply(Sum.integersPerKey());

上述代码的执行类似于下图 8 所示,在图中我添加了如下功能来突出可允许的迟到时间范围的影响:

  • 粗白线表示处理时间的当前位置,现在用有注释的虚白线给所有窗口标注可允许的迟到时间范围(注意的是迟到时间范围是使用的事件时间)。
  • 一旦 Watermark 超过了窗口的可允许的迟到时间范围,窗口就会关闭,这意味着窗口的所有状态都会被丢弃。图中的虚线矩形表示窗口关闭时覆盖的时间范围(两个时间上的范围:处理时间和事件时间),一条小尾巴向右延伸表示窗口的可允许迟到时间范围(事件时间上的范围,用于与 Watermark 对比)。
  • 我在第一个窗口中添加了一个额外的迟到数据 ‘6’。虽然是迟到数据,但仍在可允许的迟到时间范围内,因此这个数据到达时也会更新结果(11)。但是,迟到数据 9 已经超过了可允许的迟到时间范围,所以被简单地丢弃了。

图8 流引擎上的窗口求和,有 early 和 late 触发并设置了可允许的迟到时间范围

关于可允许的迟到时间范围最后有两个注意点:

  • 如果你使用的是来自具有完美 Watermark 数据源的数据,那么无需处理迟到数据,并且将可允许的迟到时间范围设置 0 秒即可。这就是图 7 中完美 Watermark 部分中看到的效果。
  • 指定可允许的迟到时间范围有一个例外:即使使用启发式 Watermark 时,也可以像为有限数量的 Key 计算全局聚合结果(例如,按 Web 浏览器类型分组计算网站的总访问次数)。在这种情况下,系统中活动窗口的数量受限于有限 Key 的个数。只要 Key 的数量保持在可管理的低水平,就无需担心通过可允许的迟到来限制窗口的生命周期。

4.4 How: accumulation

随着时间的推移,触发器会为一个窗口产生多个窗格。到这,我们剩最后一个问题:如何修正相关结果?在我们目前看到的例子中,每个连续的窗格都建立在它前面的窗格之上。实际上存在三种不同的累积模式:

  • 丢弃(Discarding):每次物化输出窗格时,都会丢弃存储状态。这意味着每个窗格都是相互独立的。当下游消费者自己执行某种累积时,这种丢弃模式很有用,例如那些只希望接收差值(delta)来求和的系统,我们只需要把接收到的新数值发送给它即可。
  • 累积(Accumulating):如图 7 所示,每次物化输出窗格时,都会保留存储状态,并将新的输入累积到现有状态中。这意味着每个连续的窗格都建立在前一个窗格的基础上。当新的结果可以简单地覆盖老的结果时,这种累积模式很有用,例如将输出存储在 BigTable 或 HBase 等键/值存储中时。
  • 累积和撤回(Accumulating & Retracting):与累积模式一样,但在物化输出窗格时,会为之前的窗格生成一个单独的撤回。撤回(配合新的累积结果)实质上是明确告诉你:’对不起,我之前告诉你的结果是 X 是错的。把我上次告诉你的 X 撤回,换成 Y’。撤回在如下两种情况下特别有用:
    • 当下游消费者按不同维度对数据进行重新分组时,新产生的值完全有可能会与旧值不同,因此新数据最终会进入不同的分组中。在这种情况下,新值不能覆盖旧值;您需要从旧组中删除旧值,在新组中加入新产生的值。
    • 当使用动态窗口(例如,会话窗口)时,由于窗口合并,新值可能会替换多个先前的窗口。在这种情况下,很难仅从新窗口中确定哪些旧窗口需要被替换。对旧窗口进行明确的撤回会使任务变得简单。

三种不同的累积模式放在一起对比查看时,不同模式的不同语义会更加清晰。我们以图 7 中第二个窗口为例,该窗口出现了三个窗格(事件时间范围为 [12:02, 12:04))。下表展示了在三种累积模式下每个窗格的值是什么样的(图 7 使用的是累积模式):

表1

  • 丢弃模式:每个窗格仅包含在该窗格期间到达的值。因此,观察到的最终值并不等于最终的总和。但是,如果你将每个窗格的值相加,你就会得到正确的值 22。这就是为什么当下游消费者本身在窗格上执行某种聚合时丢弃模式很有用的原因。
  • 累积模式:如图 7 所示,每个窗格都包含在该窗格期间到达的值,以及来自上一个窗格的所有值。因此,观察到的最终值正确的计算出总和 22。但是,如果你将每个窗格的值相加,那么实际上对窗格 2 重复计算了两次,对窗口 1 重复计算了三次,给到你的总和 51 也不是正确的。这就是为什么当你可以简单地用新值覆盖以前的值时累积模式最有用的原因:新值已经包含了迄今为止看到的所有数据。
  • 累积和撤回模式:每个窗格都包括一个新的累积值以及对前一个窗格值的撤回。因此,观察到的最后一个(非撤回)值以及所有窗格值(包括撤回值)的总和都为你提供了正确答案 22。这就是撤回如此强大的原因。

如果要查看丢弃模式的实际效果,我们需要对代码 5 做如下修改:

// 代码7
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(
AtWatermark()
.withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
.withLateFirings(AtCount(1)))
.discardingFiredPanes())
.apply(Sum.integersPerKey());

在带有启发式 Watermark 的流引擎上再次运行会产生如下输出:

图9

虽然输出的整体形状类似于图 7 中的累积模式,但需要注意的是丢弃模式下任何窗格都没有重叠。因此,每个输出都与其他输出是相互独立的。如果我们想查看实际的撤回效果,修改也是相似的(但是请注意,此时 Google Cloud Dataflow 的撤回仍在开发中,因此 API 中的命名有些推测):

// 代码8
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(
AtWatermark()
.withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
.withLateFirings(AtCount(1)))
.accumulatingAndRetractingFiredPanes())
.apply(Sum.integersPerKey());

在流引擎上运行,会产生如下输出:

图10

由于每个窗格都有重叠,因此要清楚地看到撤回有点棘手,因此我们用红色表示撤回,由于与背景蓝色窗格重叠会产生略带紫色的颜色。为了更容易的区分两个值,我稍微调整了下两个数值的位置并把它们以逗号分隔。把图 9、7(仅看启发式)和 10 的最终帧放在一起比较,可以更好的看出三种模式的区别:

图11

可以预想到,按顺序呈现的三种模式(丢弃、累积、累积和收回)在存储和计算成本方面都越来越贵。因此,累积模式的选择是在正确性、延迟和成本三方面的一种权衡。

5. 插曲

到目前为止,我们已经回答了所有四个问题:

  • What:计算逻辑是什么?答案是通过 Transformations。
  • Where:计算的事件时间范围?答案是通过窗口。
  • When:什么时候(处理时间)输出结果?答案是通过 Watermarks 和 Triggers。
  • How:如何修正相关结果?答案是通过累积模式。

上面我们上只研究了一种窗口:基于事件时间的固定窗口。从 Streaming 101 中我们知道,窗口有好几种类型,在结束之前我们一起看一下这两种窗口:处理时间固定窗口和事件时间会话窗口。

5.1 When/Where: 处理时间窗口

处理时间窗口很重要,原因有两个:

  • 对于某些用例,例如使用量监控(例如,Web 服务流量 QPS),如果想要分析流入数据量,处理时间窗口绝对是最合适的方法。
  • 对于事件发生时间很重要的用例(例如,分析用户行为趋势、计费、评分等),处理时间窗口绝对是错误的选择,能够识别这些情况也至关重要。

因此,深入了解处理时间窗口和事件时间窗口之间的差异是非常值得的,特别是考虑到当今大多数流系统中处理时间窗口被广泛应用。当我们面对的模型是严格使用事件时间时(例如本文的例子),有两种方式可以实现处理窗口:

  • Triggers:忽略事件时间(即,使用横跨全部事件时间的全局窗口)并使用触发器在处理时间轴上触发窗口计算。
  • 摄入时间:将进入系统的时间作为数据到达时的事件时间,并使用事件时间窗口处理数据。Spark Streaming 就是这样做的。

需要注意的是,这两种方法或多或少是等价的,尽管在多阶段(Stage) Pipeline 的情况下会略有不同:

  • 在使用触发器情况下,每个阶段独立地切分处理时间窗口,例如这一阶段窗口 X 中的数据可能会在出现在下一阶段的窗口 X-1 或 X+1 中;
  • 在使用摄入时间情况下,一旦数据进入到窗口 X 中,在整个 Pipeline 期间都只会出现在窗口 X 中。这是由于通过 Watermark(在 Dataflow 中)、微批次边界(在 Spark Streaming 中)或引擎级别的其他协调因素实现了不同阶段的进度同步。

正如我已经注意到的那样,处理时间窗口的最大缺点是当输入的顺序发生变化时窗口的内容也会发生变化。为了更具体地说明这一点,我们将研究如下三个用例:

  • 事件时间窗口
  • 通过触发器实现的处理时间窗口
  • 通过摄入时间实现的处理时间窗口

我们会在这三种用例上分别使用两个不同的数据集(所以,一共会有2*3种情况)。这两个输入集有完全相同的事件(相同的值,相同的事件时间),但是观察到顺序不同(即处理时间不同)。第一个输入集是我们上面一直看到的观察顺序,颜色为白色;第二个输入集在处理时间轴上做了一些移动(改变了处理时间),如下图 12 所示,颜色为紫色。

图12

5.1.1 事件时间窗口

为了建立一个基线,我们首先在启发式 Watermark 的事件时间固定窗口上分别观察这两个输入集的输出结果。我们重用代码 5/图 7 中的 early/late 代码来获得下面的结果。左边基本上是我们之前看到的样子;右边是第二个输入集的结果。这里需要注意的一点是:即使输出的整体形状不同(在处理时间上观察的顺序不同),但四个窗口的最终输出结果都是一样的:14、22、3 和 12:

图13

5.1.2 通过触发器实现的处理时间窗口

现在让我们对比一下上述两种处理时间的方法。首先,我们看一下如何使用触发器实现,需要注意三个方面:

  • 窗口:我们使用全局事件时间窗口,本质上是用事件时间窗格模拟处理时间窗口。
  • 触发器:我们根据所需的处理时间窗口大小在处理时间上定期触发窗口。
  • 累积模式:我们使用丢弃模式来保持窗格之间的彼此独立,从而让它们每个都像一个独立的处理时间’窗口’。

相应的代码类似于代码 9;需要注意的是,全局窗口是默认设置,因此不需指定窗口策略:

// 代码9
PCollection<KV<String, Integer>> scores = input
.apply(Window.triggering(
Repeatedly(AtPeriod(Duration.standardMinutes(2))))
.discardingFiredPanes())
.apply(Sum.integersPerKey());

当使用两种不同顺序的输入数据在流式引擎上执行时,结果如下图 14 所示。需要注意的是:

  • 由于我们通过事件时间窗格模拟处理时间窗口,因此,处理时间轴才是窗口,就是图中 Y 轴的宽度。
  • 由于处理时间窗口对输入数据的顺序很敏感,因此对于这两个输入集中每个’窗口’的结果都不同(即使事件发生在同一时间)。在左边我们看到的是 12、21、18,而在右边我们看到的是 7、36、4。

图14

5.1.3 通过摄入时间实现的处理时间窗口

最后,让我们看看通过将摄入时间映射输入数据的事件时间来实现处理时间窗口。代码方面,这里有四个方面值得一提:

  • 时间修改:当元素到达时,事件时间需要被摄入时间覆盖。需要注意的是,我们目前在 Dataflow 中还没有标准 API(因此在伪代码 I/O 源上使用了虚构的方法)。对于 Google Cloud Pub/Sub,你只需在发布消息时将消息的 timestampLabel 字段留空即可;对于其他来源,你需要自己查阅文档。
  • 窗口:使用标准的事件时间固定窗口。
  • 触发器:由于摄入时间提供了计算完美 Watermark 的能力,我们可以使用默认触发器,在这种情况下,当 Watermark 超过窗口末尾时,会隐式触发一次。
  • 累积模式:由于我们每个窗口只有一个输出,因此累积模式不重要。

因此,实际代码如下所示:

// 代码10
PCollection<String> raw = IO.read().withIngressTimeAsTimestamp();
PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn());
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))))
.apply(Sum.integersPerKey());

在流式引擎上执行如下图 15 所示。当数据到达时,事件时间会用摄入时间更新(即到达系统时的处理时间),从而导致向右水平移动到理想的 Watermark 线上。需要注意的是:

  • 与其他处理时间窗口示例一样,当输入的顺序发生变化时,我们也会得到不同的结果,即使输入的值和事件时间保持不变。
  • 与 Trigger 示例不同,窗口在事件时间上(因此沿着 X 轴)。尽管如此,它们并不是真正的事件时间窗口。我们只是将处理时间映射到事件时间上,删除每个输入的原始发生记录,并用一个新记录替换它,实际上并不是首次观察到数据的时间。
  • 尽管如此,由于有了 Watermark,触发器仍然在前面处理时间示例完全相同的时间点触发。此外,生成的输出值与 Trigger 示例的相同:左侧为 12、21、18,右侧为 7、36、4。
  • 由于使用摄入时间时可能会出现完美的 Watermark,实际 Watermark 与理想 Watermark 一样,以斜率为 1 的直线向右上方上升。

图15

我们可以看到实现处理时间窗口可以有不同的方式,但这里最大的收获是我自第一篇文章以来一直在强调的:事件时间窗口与顺序无关(在输入完成之前,实际的窗口会不断变化);然而处理时窗口不是这样的。如果你关心事件实际发生的时间,则必须使用事件时间窗口,否则你的结果将会毫无意义。

5.2 Where: 会话窗口

我们现在要看看我最喜欢的功能之一:动态的、数据驱动的窗口,称为会话窗口。会话窗口是一种特殊类型的窗口,会捕获数据中的一个活动周期(由不活动的间隔时间划分不同的活动周期)。这在数据分析中特别有用,因为可以提供用户在特定时间段内参与的某些活动。在会话中看到关联的活动,并根据会话的长度推断参与程度等。从窗口的角度来看,会话窗口在两个方面特别有趣:

  • 这是一个数据驱动窗口的示例:窗口的位置和大小与输入数据本身由直接的关系,而不是像固定窗口和滑动窗口那样基于时间上的某种预定义模式。
  • 这也是一个非对齐窗口的示例:这种窗口没有统一地应用到所有数据上,而只是应用到该数据的一个特定子集(如,每个用户)。 这与固定窗口和滑动窗口等对齐窗口形成鲜明对比,后者通常均匀地应用于整个数据集。

对于某些用例,可以提前使用通用标识符对单个会话中的数据进行打标。在这种情况下,会话更容易构建,因为基本上只要按照 Key 分组就好了。然而,在更一般的情况下(即,实际会话本身并不提前知道),会话必须仅根据在时间范围上的位置来构建。在处理乱序数据时,这变得特别棘手。

提供一般会话的关键是,根据定义,完整的会话窗口是一组较小的重叠窗口的组合,每个窗口包含一条记录,序列中的每条记录与下一条记录之间的间隔不超过预定义的超时时间。因此,即使我们观察到会话中的有乱序数据,我们也可以简单地通过将重叠的窗口合并在一起来构建最终会话,以便在单个数据到达时将它们合并在一起。

图16

让我们看一个示例,使用代码 8 中启用了撤回的 early/late 代码,并改为会话窗口:

// 代码11
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
.triggering(
AtWatermark()
.withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
.withLateFirings(AtCount(1)))
.accumulatingAndRetractingFiredPanes())
.apply(Sum.integersPerKey());

在流引擎上执行,你会得到如下图 17 所示的内容:

图17

这里做了很多事情,所以我与你们一起看一下:

  • 当遇到第一个值为 5 的记录时,会被放置到一个原始会话窗口中,该窗口从该记录的事件时间开始并横跨会话间隙持续时间的宽度,例如,超过该数据发生点一分钟。后面再遇到与这个窗口重叠的任何窗口都应该是同一会话的一部分,会被合并到这个窗口中。
  • 第二个到达的记录是 7,同样被放置在它自己的原始会话窗口中,因为它不与 5 的窗口重叠。
  • 同时,Watermark 已经超过了第一个窗口的末尾,所以值 5 在 12:06 之前作为 on-time 结果输出。此后不久,就在处理时间达到 12:06 时,第二个窗口也输出值为 7。
  • 接下来,我们观察到一系列记录:3、4 和 3,它们的原始会话出现了重叠。最终,它们全部合并在一起,并在 12:07 时遇到了 early 触发器触发,输出一个值为 10 的窗口。
  • 8 很快到达,与值为 7 的原始会话和值 10 的合并会话重叠。因此,这三个需要合并在一起,形成一个新的组合会话,值为 25。当 Watermark 到达这个会话结尾时,输出新会话值 25 以及先前输出但后来合并到其中的两个窗口的撤回:7 和 10。
  • 当迟到的数据 9 到达时,同值为 5 的原始会话和值为 25 的合并会话再合并为一个更大的值为 39 的新会话。39 以及 5 和 25 窗口的撤回都在迟到数据触发器触发时立即输出。

这非常强大。真正令人敬畏的是,在一个模型中描述这样的事情是多么的容易,该模型将流处理的维度分解为不同的、可组合的部分。最后,你只需要更多地关注手头的业务逻辑,不用关注将数据塑造成某种可用形式的细节。如果你不相信我,可以查看这篇博文:如何在 Spark Streaming 上手动建立会话(请注意,这样做并不是为了指责他们做的不好;Spark 的人在其他所有方面都做得很好)。

6. 终结篇,感觉好极了!

到此,我已经完成了所有示例。你现在已经深入了解强大的流处理的基础,并准备好走向这个流处理世界并做出一些令人兴奋的事情。但在你离开之前,我想快速回顾一下我们所涵盖的内容,以免你匆忙忘记其中的任何内容。首先,我们涉及的主要概念:

  • 事件时间与处理时间:最重要的区别是一个是事件发生时间;一个是数据处理系统观察到事件时间。
  • 窗口:沿时间边界切分是管理无限数据的常用方法(按照处理时间或事件时间,尽管我们在 Dataflow 模型中仅使用事件时间)。
  • Watermarks:衡量事件时间进度的一个非常有用的概念,提供了一种在操作无限数据的无序处理系统中推测完整性的方法。
  • 触发器:一种告知系统何时可以输出数据的声明机制。在某些场景中十分重要。
  • 累积模式:单个窗口在演变多次物化时,不同修正结果之间的关系。

其次,我们用来构建框架的四个问题(我承诺这是最后一次说它们):

  • What:计算逻辑是什么?= transformations
  • Where:计算的事件时间范围?= 窗口
  • When:什么时候(处理时间)输出结果?= watermarks + triggers
  • How:如何修正相关结果?= accumulation

第三,也是最后一点,为了将这种流处理模型所提供的灵活性带回家(因为最后,这才是真正的意义:权衡正确性、延迟和成本三者之间的关系),回顾一下:我们只要修改一点点代码就能实现在相同数据集上各种不同的产出:

图18

原文: Streaming 102: The world beyond batch

赏几毛白!