基于社区版1.5的定制化开发工作已经接近尾声了,最后一个大的Feature是背压指标的量化。

背压

流计算框架上都提供了背压相关的机制,但监控量化的形式各有不同。

Flink-UI上提供了一个背压探测的功能,基本原理就是Profiling。

在实际应用中,我们发现了很多问题,比如:

  • 只能观测当前,无法回溯历史;

  • 一次只能观测一个taskchain,没有全局概念;

  • 量化概念模糊,不利于告警预警;

  • 开销高,无法长期运行;

综上,我们决定开发自己的背压监控功能,并为用户提供预警告警的功能。

量化

我们定义背压的指标和之前fregata做伸缩容的指标是一个含义,内部称之为自旋时间,

简单来说就是数据处理线程等待eventbuffer的时间。

在flink中,主要修改如下几个类:


RecordWriter.requestNewBufferBuilder

LocalBufferPool.requestBufferBuilderBlocking

在LocalBufferPool的requestBuffer时,会进入一段while循环


while (availableMemorySegments.isEmpty()) 

    ...
    availableMemorySegments.wait(2000);
    ...

这块代码主要是我们修改的区域。

总的来说,我们统计了wait的时间和进入while循环的次数,作为背压的指标。

进入while循环次数多,我们认为stream是时断时续的,效率不高,类似堵车,

这类问题大多需要调节networkbuffer大小,并行度等可以有效解决。

wait的累加时间长(比如1min里有58s在wait),任务下游严重阻塞,

这类问题最大的可能是下游IO超时阻塞。

维度

我们的监控数据最后是进入Promethues的,有了指标和值,还需要一些维度信息。

Flink的Metric主要是task级和Operator级,所以我们只要参照flink的

metric定义方式,就获得了subtask的一系列标签(taskId,index,attemptId等)。

在此基础上,我们又将数据的target也作为了一个维度,就是RecordWriter中的

targetChannelId。如果一个task的下游有多个target,我们可以区分出到底是

哪一个导致背压的。

开销

我们增加的这些统计指标,在程序没法发生背压时是没有任何开销的,一旦发生背压,

就意味着程序的吞吐极具下降,那么相关逻辑的执行次数也非常少,costtime的计算

和累加计数器的操作开销也非常低,几乎可以忽略不计。

效果

监控数据打通后,我们在Promethues上进行了一下统计,发现了20多个任务存在背压,

我们也对比了Flink-WebUI上的探测结果,结论都是准确的,证明方案是可行有效的。