Flink 事件时间与处理时间

Flink版本:1.11

Flink 在数据流中支持几种不同概念的时间。

1. 处理时间

Processing Time(处理时间)是指执行相应操作的机器系统时间,是操作算子在计算过程中获取到的所在主机的系统时间。当用户选择使用处理时间时,所有和时间相关的算子,例如 Windows 计算,在当前任务中所有的算子直接使用所在主机的系统时间。例如,一个基于处理时间按每小时进行处理的时间窗口将处理一个小时内(以系统时间为标准)到达指定算子的所有的记录。

处理时间是最简单的一个时间概念,不需要在数据流和机器之间进行协调。具有最好的性能和最低的延迟。然而,在分布式或者异步环境中,处理时间具有不确定性,因为容易受到记录到达系统速度的影响(例如,从消息队列到达的记录),还会受到系统内记录在不同算子之间的流动速度的影响。对数据乱序的处理,处理时间不是一种最优的选择。

总之,处理时间适用于时间计算精度要求不是特别高的计算场景。

2. 事件时间

Event Time(事件时间)是每个事件在产生它的设备上发生的时间。在进入 Flink 之前,事件时间通常要嵌入到记录中,并且事件时间也可以从记录中提取出来。对于事件时间,时间的进度取决于数据,而不是系统时钟。基于事件时间的程序必须指定如何生成事件时间的Watermarks,这是表示事件时间进度的机制。

在理想情况下,事件时间处理将产生完全一致且确定的结果,无论事件何时到达以及以什么样的顺序到达。但是,除非已知事件是按顺序(按时间戳)到达,否则事件时间处理在等待无序事件时产生一定的延迟。由于只能等待有限的时间,因此这限制了事件时间应用程序的确定性。

假定所有数据都已到达,事件时间算子将按预期方式运行,即使在处理无序、迟到事件或重新处理历史数据时,也会产生正确且一致的结果。例如,每小时事件时间窗口将将处理事件时间在这一小时之内所有的记录,不管它们何时到达,以及它们以什么顺序到达。

按事件时间处理往往会导致一定的延迟,因为它要等待延迟事件和无序事件一段时间。因此,事件时间程序通常与处理时间操作相结合使用。

3. 摄入时间

Ingestion Time(摄入时间)是事件进入Flink的时间。在 Source 算子中,每个记录将 Source 的当前时间记为时间戳,基于时间的操作(如时间窗口)会使用该时间戳。

摄入时间在概念上处于事件时间和处理时间之间。与处理时间相比,摄入时间的成本稍微更高一些,但是可以提供更可预测的结果。因为摄入时间的时间戳比较稳定(在 Source 处只记录一次),同一数据在流经不同窗口操作时将使用相同的时间戳,然而对于处理时间,每个窗口算子可能将记录分配给不同的窗口(基于本地系统时钟以及传输延迟)。与事件时间相比,摄入时间程序无法处理任何无序事件或延迟事件,但程序不必指定如何生成watermarks

在内部,摄入时间与事件时间非常相似,但事件时间会自动分配时间戳以及自动生成watermark

4. 选择时间特性

Flink DataStream 程序的第一部分通常设置基本的时间特性。该设置定义数据流源的行为方式(例如,它们是否产生时间戳),以及窗口操作如KeyedStream.timeWindow(Time.seconds(30))应使用哪一类型的时间,是事件时间还是处理时间等。

以下示例展示了一个聚合每小时时间窗口内的事件的 Flink 程序。窗口的行为会与时间特性相匹配:

Java版本:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));

stream
.keyBy( (event) -> event.getUser() )
.timeWindow(Time.hours(1))
.reduce( (a, b) -> a.add(b) )
.addSink(...);

Scala版本:

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))

stream
.keyBy( _.getUser )
.timeWindow(Time.hours(1))
.reduce( (a, b) => a.add(b) )
.addSink(...)

备注:为了以事件时间运行此示例,程序需要使用定义了事件时间并自动产生watermarks的 Source,或者程序必须在 Source 之后设置时间戳分配器和 watermarks 生成器。

欢迎关注我的公众号和博客:

原文:Timely Stream Processing

赏几毛白!