我们最近merge了flink1.5.5的官方更新,并对batchsink进行深入开发。

Monkey

上次介绍我们的batchsink具备常规的功能,能够支持按照lingertime、数据大小、数据条数

进行batch的拆分。这个月我们在其中嵌入了一个叫monkey的字段,动态调节batch大小。

具体使用方法如下:

  public boolean checkFull() {
        return WP.get() - RP.get() == this.size - this.monkey;
    }

通过引入一个变量,实现了ringbuffer大小的动态调节。

动态调节的时机

我们将ringbuffer的每一圈的开始作为汇报监控指标和调节monkey值的时机。

int sq = mod(cur);
if (this.adjustment != null && sq == 0) {
  this.monkey = this.adjustment.onFirstEvent(ele, RING[mod(cur + 1)], (cur / this.size), this.monkey,
  this.size);
}

汇报时将当前写入数据的slot和即将被写入数据的slot当成参数传递个adjustment,

还提供了圈数、oldmonkey值和圈大小等信息,返回值是新的monkey值。

Adjustment

那么monkey的动态可以做什么呢?我们想实现对batchsize的自动化测试。

每当我们给一个开源的软件填写batchsize参数配置时,大多是拍脑袋出来的,

也可能做了一些简单的性能测试,这个过程比较枯燥,就是反复修改值,反复运行。

通过monkey的动态,我们可以将每一个batch大小进行几十次测试,然后按照

一个步长变更batch大小,继续进行测试,将原来手工的方式变成自动化的。

    public long onFirstEvent(Element<T> first, Element<T> second, long cycles, long oldMonkey, long maxSize) {
        if (cycles > 0) {
            log.warn("Cycle {} , Monkey {} , Cost {} ms , Volume {} , Record {} .", cycles, oldMonkey, first.getTimestampW() -
                    second.getTimestampW(), first.getAccVolume() - second.getAccVolume(), maxSize);

            this.metrics.add(new double[]{oldMonkey, (first.getTimestampW() - second.getTimestampW()), ((double) maxSize *
                    1000) / (first.getTimestampW() - second.getTimestampW())});

            if (cycles % this.intervalCycle == 0) {
                formulaFitting();

                if (maxSize - oldMonkey <= this.monkeyStepSize) {
                    this.result.forEach(doubles -> log.warn("evaluate : " + Arrays.toString(doubles)));
                    this.result.clear();
                    return this.roundTrip ? 0 : oldMonkey;
                } else {
                    return oldMonkey + this.monkeyStepSize;
                }
            }
        }
        return oldMonkey;
    }

简单的几行代码,我们就基本实现了这个需求。每运行N圈后,将monkey增加一个monkeystepsize。

评价

经过一系列的测试后,我们如何评估最佳的batchsize呢?

在运行时,我们已经将一些指标记录到了metrics里,包括monkey大小,跑一圈的cost,还有每秒吞吐条数。

   double totalRate = 0;
        for (int i = 0; i < this.metrics.size(); i++) {
            totalRate += this.metrics.get(i)[2];
        }
        double avgRate = totalRate / this.metrics.size();
        double acc = 0;
        for (int i = 0; i < this.metrics.size(); i++) {
            acc += Math.pow(this.metrics.get(i)[2] - avgRate, 2);
        }
        this.result.add(new double[]{this.metrics.get(0)[0], totalRate / this.metrics.size(), acc / (this.metrics.size() - 1)});
        this.metrics.clear();

然后我们对metrics进行一些简单的数据分析,计算均值还有方差,来评价最优解。

在我们的测试中,平均速度差距不大,但是方差有明显的变化趋势,随着batch大小的增长,

方差越来越大,通过观察jvm的内存变化推测为,由于batchcache的数据量增大,数据更

容易进入到old区,导致fgc的频率提高,ygc的时间变长,导致速度的波动较大。