先简单介绍一下我们Flink的一个优化,关于asyncfunction的优化

AsyncFunction

阿里为flink社区提供了async的patch,为解决流计算中的IO性能提升带来了新思路,

但我个人觉得这个问题并没有真正解决,Asyncfunction更像是一个饮鸩止渴的方案。

随着异步线程的增加,很快会将外部服务打满,性能极具下降。

我们的优化思路是这样的,在与外部系统交互时,尽量使用小批量,而不是单条数据处理。

将async和batch结合起来,提升asyncfunction的效率。

具体的实现:

引入一个ringbuffer作为缓冲,asyncfunction在asyncinvoke时,将数据写入buffer中,

配置N个消费线程,消费ringbuffer里的数据,进行batch的积攒,并配置ringbuffer的timeout,

一次来实现N条最大M秒的批量数据积攒,在消费线程内,对积攒的批量数据进行处理,

返回结果的拆分,最后调用ResultFuture,将数据交还给asyncfunction。

通用BatchAsyncFunction的实现


  @Override
    public void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception {
        long seq = this.ringBuffer.next();
        Event event = this.ringBuffer.get(seq);
        event.data = input;
        event.resultFuture = resultFuture;
        this.ringBuffer.publish(seq);
    }


通用批量消费线程的抽象


public abstract class Processor<D, T, P> implements WorkHandler<Event<D, T>>, TimeoutHandler, LifecycleAware {

    protected AsyncConfig asyncConfig;

    private boolean initedBatch;

    private List<Triple<D, ResultFuture<T>, P>> batch;

    private long lastBatchTime;

    public Processor(AsyncConfig asyncConfig) {
        this.asyncConfig = asyncConfig;
        this.batch = Lists.newArrayList();
    }

    @Override
    public void onStart() {
        cleanAfterBatch();
    }

    @Override
    public void onEvent(Event<D, T> event) throws Exception {
        if (!this.initedBatch) {
            initBatch(false);
            this.initedBatch = true;
        }

        P param = buildBatchParams(event.data);
        this.batch.add(Triple.of(event.data, event.resultFuture, param));

        if ((this.batch.size() >= this.asyncConfig.getRingbufferMaxBatchSize()) || (System.currentTimeMillis() - this
                .lastBatchTime >= this.asyncConfig.getRingbufferLingerMs())) {
            execBatch(0);
            cleanAfterBatch();
        }
    }

    @Override
    public void onTimeout(long l) throws Exception {
        if (this.initedBatch) {
            execBatch(0);
            cleanAfterBatch();
        }
    }

    @Override
    public void onShutdown() {
        if (this.initedBatch) {
            execBatch(0);
            cleanAfterBatch();
        }
    }

    protected abstract void initBatch(boolean retry);

    protected abstract P buildBatchParams(D data);

    protected void cleanAfterBatch() {
        this.initedBatch = false;
        this.batch.clear();
        this.lastBatchTime = System.currentTimeMillis();
    }

    protected void execBatch(int times) {
        Validate.isTrue(this.asyncConfig.getAsyncMaxRetry() > times, "execbatch retry-times : " + times);
        try {
            if (times > 0) {
                initBatch(true);
            }
            commitAndGetResultsAndCloseBatch();
            this.batch.forEach(triple -> pushData(triple.getLeft(), triple.getMiddle(), triple.getRight()));
        } catch (Exception e) {
            try {
                Thread.sleep(this.asyncConfig.getAsyncRetryIntervalMs());
            } catch (InterruptedException e1) {
                throw new RuntimeException("Failed to exec batch", e1);
            }
            execBatch(times + 1);
        }
    }

    protected abstract void commitAndGetResultsAndCloseBatch() throws Exception;

    protected abstract void pushData(D data, ResultFuture<T> resultFuture, P param);

}