初识flink5
先简单介绍一下我们Flink的一个优化,关于asyncfunction的优化
AsyncFunction
阿里为flink社区提供了async的patch,为解决流计算中的IO性能提升带来了新思路,
但我个人觉得这个问题并没有真正解决,Asyncfunction更像是一个饮鸩止渴的方案。
随着异步线程的增加,很快会将外部服务打满,性能极具下降。
我们的优化思路是这样的,在与外部系统交互时,尽量使用小批量,而不是单条数据处理。
将async和batch结合起来,提升asyncfunction的效率。
具体的实现:
引入一个ringbuffer作为缓冲,asyncfunction在asyncinvoke时,将数据写入buffer中,
配置N个消费线程,消费ringbuffer里的数据,进行batch的积攒,并配置ringbuffer的timeout,
一次来实现N条最大M秒的批量数据积攒,在消费线程内,对积攒的批量数据进行处理,
返回结果的拆分,最后调用ResultFuture,将数据交还给asyncfunction。
通用BatchAsyncFunction的实现
@Override
public void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception {
long seq = this.ringBuffer.next();
Event event = this.ringBuffer.get(seq);
event.data = input;
event.resultFuture = resultFuture;
this.ringBuffer.publish(seq);
}
通用批量消费线程的抽象
public abstract class Processor<D, T, P> implements WorkHandler<Event<D, T>>, TimeoutHandler, LifecycleAware {
protected AsyncConfig asyncConfig;
private boolean initedBatch;
private List<Triple<D, ResultFuture<T>, P>> batch;
private long lastBatchTime;
public Processor(AsyncConfig asyncConfig) {
this.asyncConfig = asyncConfig;
this.batch = Lists.newArrayList();
}
@Override
public void onStart() {
cleanAfterBatch();
}
@Override
public void onEvent(Event<D, T> event) throws Exception {
if (!this.initedBatch) {
initBatch(false);
this.initedBatch = true;
}
P param = buildBatchParams(event.data);
this.batch.add(Triple.of(event.data, event.resultFuture, param));
if ((this.batch.size() >= this.asyncConfig.getRingbufferMaxBatchSize()) || (System.currentTimeMillis() - this
.lastBatchTime >= this.asyncConfig.getRingbufferLingerMs())) {
execBatch(0);
cleanAfterBatch();
}
}
@Override
public void onTimeout(long l) throws Exception {
if (this.initedBatch) {
execBatch(0);
cleanAfterBatch();
}
}
@Override
public void onShutdown() {
if (this.initedBatch) {
execBatch(0);
cleanAfterBatch();
}
}
protected abstract void initBatch(boolean retry);
protected abstract P buildBatchParams(D data);
protected void cleanAfterBatch() {
this.initedBatch = false;
this.batch.clear();
this.lastBatchTime = System.currentTimeMillis();
}
protected void execBatch(int times) {
Validate.isTrue(this.asyncConfig.getAsyncMaxRetry() > times, "execbatch retry-times : " + times);
try {
if (times > 0) {
initBatch(true);
}
commitAndGetResultsAndCloseBatch();
this.batch.forEach(triple -> pushData(triple.getLeft(), triple.getMiddle(), triple.getRight()));
} catch (Exception e) {
try {
Thread.sleep(this.asyncConfig.getAsyncRetryIntervalMs());
} catch (InterruptedException e1) {
throw new RuntimeException("Failed to exec batch", e1);
}
execBatch(times + 1);
}
}
protected abstract void commitAndGetResultsAndCloseBatch() throws Exception;
protected abstract void pushData(D data, ResultFuture<T> resultFuture, P param);
}