初识flink6
基于社区版1.5的定制化开发工作已经接近尾声了,最后一个大的Feature是背压指标的量化。
背压
流计算框架上都提供了背压相关的机制,但监控量化的形式各有不同。
Flink-UI上提供了一个背压探测的功能,基本原理就是Profiling。
在实际应用中,我们发现了很多问题,比如:
-
只能观测当前,无法回溯历史;
-
一次只能观测一个taskchain,没有全局概念;
-
量化概念模糊,不利于告警预警;
-
开销高,无法长期运行;
综上,我们决定开发自己的背压监控功能,并为用户提供预警告警的功能。
量化
我们定义背压的指标和之前fregata做伸缩容的指标是一个含义,内部称之为自旋时间,
简单来说就是数据处理线程等待eventbuffer的时间。
在flink中,主要修改如下几个类:
RecordWriter.requestNewBufferBuilder
LocalBufferPool.requestBufferBuilderBlocking
在LocalBufferPool的requestBuffer时,会进入一段while循环
while (availableMemorySegments.isEmpty())
...
availableMemorySegments.wait(2000);
...
这块代码主要是我们修改的区域。
总的来说,我们统计了wait的时间和进入while循环的次数,作为背压的指标。
进入while循环次数多,我们认为stream是时断时续的,效率不高,类似堵车,
这类问题大多需要调节networkbuffer大小,并行度等可以有效解决。
wait的累加时间长(比如1min里有58s在wait),任务下游严重阻塞,
这类问题最大的可能是下游IO超时阻塞。
维度
我们的监控数据最后是进入Promethues的,有了指标和值,还需要一些维度信息。
Flink的Metric主要是task级和Operator级,所以我们只要参照flink的
metric定义方式,就获得了subtask的一系列标签(taskId,index,attemptId等)。
在此基础上,我们又将数据的target也作为了一个维度,就是RecordWriter中的
targetChannelId。如果一个task的下游有多个target,我们可以区分出到底是
哪一个导致背压的。
开销
我们增加的这些统计指标,在程序没法发生背压时是没有任何开销的,一旦发生背压,
就意味着程序的吞吐极具下降,那么相关逻辑的执行次数也非常少,costtime的计算
和累加计数器的操作开销也非常低,几乎可以忽略不计。
效果
监控数据打通后,我们在Promethues上进行了一下统计,发现了20多个任务存在背压,
我们也对比了Flink-WebUI上的探测结果,结论都是准确的,证明方案是可行有效的。