1. 概述
累加器(Accumulators
)是一个简单的构造器,具有加法操作和获取最终累加结果操作,在作业结束后可以使用。
最直接的累加器是一个计数器(counter
):你可以使用Accumulator.add()
方法对其进行累加。在作业结束时,Flink
将合并所有部分结果并将最终结果发送给客户端。在调试过程中,或者你快速想要了解有关数据的更多信息,累加器很有用。
目前Flink
拥有以下内置累加器。它们中的每一个都实现了累加器接口:
(1) IntCounter
, LongCounter
以及 DoubleCounter
: 参阅下面示例中使用的计数器。
(2) Histogram
:为离散数据的直方图(A histogram implementation for a discrete number of bins.)。内部它只是一个整数到整数的映射。你可以用它来计算值的分布,例如 单词计数程序的每行单词分配。
2. 如何使用
首先,你必须在你要使用的用户自定义转换函数中创建一个累加器(accumulator
)对象(这里是一个计数器):private IntCounter numLines = new IntCounter();
其次,你必须注册累加器(accumulator
)对象,通常在rich
函数的open()
方法中注册。在这里你也可以自定义累加器的名字:getRuntimeContext().addAccumulator("num-lines", this.numLines);
现在你就可以在算子函数中的任何位置使用累加器,包括在open()
和close()
方法中:this.numLines.add(1);
最后结果将存储在JobExecutionResult
对象中,该对象从执行环境的execute()
方法返回(当前仅当执行等待作业完成时才起作用):JobExecutionResult result = env.execute();
long lineCounter = result.getAccumulatorResult("num-lines");
System.out.println(lineCounter);
每个作业的所有累加器共享一个命名空间。因此,你可以在作业的不同算子函数中使用同一个累加器。Flink
在内部合并所有具有相同名称的累加器。
备注:目前累加器的结果只有在整个工作结束之后才可以使用。我们还计划在下一次迭代中可以使用前一次迭代的结果。你可以使用聚合器来计算每次迭代的统计信息,并基于此类统计信息来终止迭代。
3. Example
import com.google.gson.Gson; |
import com.qunar.innovation.data.TestFlink; |
3. 自定义累加器
为了实现你自己的累加器,你只需要编写你的Accumulator
接口的实现。如果你认为你的自定义累加器应与Flink
一起传输,请随意创建一个拉取请求(Feel free to create a pull request if you think your custom accumulator should be shipped with Flink.)。
你可以选择实现Accumulator或SimpleAccumulator。
Accumulator<V,R>
非常灵活:它为要添加的值定义一个类型V
,并为最终结果定义一个结果类型R
。例如,对于直方图,V
是数字,R
是直方图。SimpleAccumulator
适用于两种类型相同的情况,例如,计数器。
备注:Flink版本:1.4