初识flink3
Flink的更新非常快,1.5的小版本一直都有更新,十一国庆前,我们将自己的定制版分支merge了
官方的1.5.4,修复了taskmanager堆内外的内存比例,flink-stream使用堆外的量并不是很大,
我们配置了7:3。
AsyncIO
阿里给Flink社区提交了很多patch,印象比较深的是AsyncIO,在流式计算中IO导致性能瓶颈,
是最常见的一个现象,异步化的改造能提升不少性能。Async的Operator实现并不是很复杂,
简单来说就是创建一个buffer,将Stream数据填充进来并行处理,返回结果时控制一下数据的
顺序,跟我们最常见的单机多线程的数据处理本质上是一样的。
但是异步化会带来一个非常严重的后果,那就是IO的接收方会承受巨大的压力,通过调节buffer
的大小和并行度,我们轻松的获得了几千几万的并行能力,但是这个IO服务(无论是RPC还是DB)
接收了如性能压测般的流量,最终的结果就是IO服务拥塞,无法提供正常服务了。
AsyncIO的思路是没有问题的,但是各种Java世界中的各种Client的API风格并非为了Stream而生,
理想的API风格是kafka Producer的样子,流转批 + 异步回调。
Stream->Batch
为了能让AsyncIO发挥作用,我们需要收敛IO的次数,将多次IO合并为一次Batch,基于Batch再做
AsyncIO的操作,这样可以在减小并行度的情况下,能保证较高的吞吐量,减少协议部分的开销,
如果你用过redis的pipeline,差不多一样的道理。
当然Stream->Batch并不是那么简单的事情,需要考虑的细节非常多,比如对checkpoint的处理,
watermarket如何触发等,防止破坏了FLink本身的一致性。
我们的思路是将AsyncIO和Stream转Batch结合在一起实现,提供一层类似KafkaProducer风格的
API,将Stream->Batch转为标准的定义,最后结合AsyncIO的特征来实现,AsyncIO对watermarket
和checkpoint是有特殊处理的,这部分逻辑要尽量保持复用。
Batch Sink
所有任务Sink都是需要Batch特性的,如果是kafkaSink那真的非常简单,天然支持。我读了一下
业务方的代码,他们在Sink这个环节的实现都非常简陋,大多数都无法保证不丢数据,比如基于
一个简单的flatmap来实现。
于是我们开发了BatchSink抽象,提供时间、大小、条数的控制条件,还有checkpoint触发flush。
值得一提的是flush中的retry实现,批量操作失败如何重试呢?
这是一个非常好的问题,一年多前我在开发meepo时就有这样的困惑,当时主要是读写mysql,
我将mysql client的源码读了大半,实现了非常低开销的基于preparestatement的retry。
但这种方式并不一定适用于所有的client,于是我们在batchsink中嵌入了一个小的ringbuffer,
用来缓存这个批次的数据,在flush成功后清空ringbuffer。retry时复用handlerElement方法,
将这一批次的数据进行replay,当然也支持noreplay的retry方法。
protected void sendBatch(int times, boolean bySnapshot) {
Validate.isTrue(this.batchConfig.getMaxRetriesNum() > times, "sendbatch retry-times : " + times);
try {
if (times > 0) {
initBatch(true);
if (this.batchConfig.isNeedReplayWhenRetry()) {
int l = this.buffer.curSize();
for (int i = 0; i < l; i++) {
Optional<IN> record = this.buffer.get();
Validate.notNull(record);
IN e = Optional.empty() == record ? null : record.get();
handleRecord(e);
this.buffer.add(e);
}
}
}
flushAndClose(bySnapshot);
} catch (Exception e) {
LOG.error("FlushAndCloseError : ", e);
try {
Thread.sleep(this.batchConfig.getBatchDelayPeriodMs());
} catch (InterruptedException e1) {
throw new RuntimeException("Failed to send batchRecords", e1);
}
sendBatch(times + 1, bySnapshot);
}
}