初识flink4
我们最近merge了flink1.5.5的官方更新,并对batchsink进行深入开发。
Monkey
上次介绍我们的batchsink具备常规的功能,能够支持按照lingertime、数据大小、数据条数
进行batch的拆分。这个月我们在其中嵌入了一个叫monkey的字段,动态调节batch大小。
具体使用方法如下:
public boolean checkFull() {
return WP.get() - RP.get() == this.size - this.monkey;
}
通过引入一个变量,实现了ringbuffer大小的动态调节。
动态调节的时机
我们将ringbuffer的每一圈的开始作为汇报监控指标和调节monkey值的时机。
int sq = mod(cur);
if (this.adjustment != null && sq == 0) {
this.monkey = this.adjustment.onFirstEvent(ele, RING[mod(cur + 1)], (cur / this.size), this.monkey,
this.size);
}
汇报时将当前写入数据的slot和即将被写入数据的slot当成参数传递个adjustment,
还提供了圈数、oldmonkey值和圈大小等信息,返回值是新的monkey值。
Adjustment
那么monkey的动态可以做什么呢?我们想实现对batchsize的自动化测试。
每当我们给一个开源的软件填写batchsize参数配置时,大多是拍脑袋出来的,
也可能做了一些简单的性能测试,这个过程比较枯燥,就是反复修改值,反复运行。
通过monkey的动态,我们可以将每一个batch大小进行几十次测试,然后按照
一个步长变更batch大小,继续进行测试,将原来手工的方式变成自动化的。
public long onFirstEvent(Element<T> first, Element<T> second, long cycles, long oldMonkey, long maxSize) {
if (cycles > 0) {
log.warn("Cycle {} , Monkey {} , Cost {} ms , Volume {} , Record {} .", cycles, oldMonkey, first.getTimestampW() -
second.getTimestampW(), first.getAccVolume() - second.getAccVolume(), maxSize);
this.metrics.add(new double[]{oldMonkey, (first.getTimestampW() - second.getTimestampW()), ((double) maxSize *
1000) / (first.getTimestampW() - second.getTimestampW())});
if (cycles % this.intervalCycle == 0) {
formulaFitting();
if (maxSize - oldMonkey <= this.monkeyStepSize) {
this.result.forEach(doubles -> log.warn("evaluate : " + Arrays.toString(doubles)));
this.result.clear();
return this.roundTrip ? 0 : oldMonkey;
} else {
return oldMonkey + this.monkeyStepSize;
}
}
}
return oldMonkey;
}
简单的几行代码,我们就基本实现了这个需求。每运行N圈后,将monkey增加一个monkeystepsize。
评价
经过一系列的测试后,我们如何评估最佳的batchsize呢?
在运行时,我们已经将一些指标记录到了metrics里,包括monkey大小,跑一圈的cost,还有每秒吞吐条数。
double totalRate = 0;
for (int i = 0; i < this.metrics.size(); i++) {
totalRate += this.metrics.get(i)[2];
}
double avgRate = totalRate / this.metrics.size();
double acc = 0;
for (int i = 0; i < this.metrics.size(); i++) {
acc += Math.pow(this.metrics.get(i)[2] - avgRate, 2);
}
this.result.add(new double[]{this.metrics.get(0)[0], totalRate / this.metrics.size(), acc / (this.metrics.size() - 1)});
this.metrics.clear();
然后我们对metrics进行一些简单的数据分析,计算均值还有方差,来评价最优解。
在我们的测试中,平均速度差距不大,但是方差有明显的变化趋势,随着batch大小的增长,
方差越来越大,通过观察jvm的内存变化推测为,由于batchcache的数据量增大,数据更
容易进入到old区,导致fgc的频率提高,ygc的时间变长,导致速度的波动较大。