今天聊聊双流的实时计算问题,我个人觉得算是流处理中最复杂的一个场景了。

想要在两个Source同时作为input时,保证流处理的恰好一次计算,仅仅依靠chandy-lamport是不够的。

两个Source的input交叉顺序是随机的行为,在业务上有严格的恰好一次要求,且代码还做不到

顺序不敏感时,怎么办呢?

合并流

最简单的办法是将两条流合并,将不确定的交叉行为给确定了,这样可以用chandy-lamport了。

合并流会增加一个merge任务和一个新的kafka topic,相当于增加了一个环节、增加了一点延迟。

虽然会浪费一点资源,但是简单高效。如果两个Source的Merge依据是Event Time,那么情况还

要更复杂一些。单纯的把两条流Merge在一起并不能解决问题,还需要在一个时间窗口内进行缓冲排序。

流转批

将流式处理转为批量处理也是可以解决这个问题的,以窗口1min为例。将两个Source的数据都

缓存1min(内存里或者依赖外部存储),当窗口关闭时发起对这期间的数据进行处理。

在处理时可指定某种确定的数据排序方法,保证处理顺序的一致性。这种方式只适用于数据量不大,

对时效性要求不高的场景。

限位器

不想合并出一个新的topic,也不想延迟太高,就需要在Streaming的模式下进行两个流的协调和缓冲。

驱动流

首先,在两个Source中选出来一个,作为时间驱动的,要求是他的数据时间具备单调性,也就是有序的。

在实际生产中,我们很难保证写入kafkaTopic里的数据一直单调有序,除非启用transaction。

但是我个人觉得使用事务造成的问题远大于收益。我这里说的单调性和有序是相对的,应该叫整体有序,

可以接受局部的重复和回退。当我们实时处理这个Source的数据时,有内存状态记录这上一条的时间

或者序号进行过滤,一旦发生了回退或者重复的数据就直接跳过。这样我们在经过filter后,就得到了

一条绝对有序的驱动数据流A,(后面简称为A)。注意,如果A在一定时间内没有数据产生,最好写入

带有时间戳的心跳数据包,以方便时间管理的协调控制后。

被动流

如果另外一个Source也是和驱动流一样,具备整体有序特征的话,就非常简单了。经过filter后,

与A汇合入同一个Operator中,只需要按照两个流的时间进度进行控制,以A的时间为主,这个时候

心跳数据包的作用就体现出来了,防止被动流的数据提前被加载进来。

如果被动流的数据,不具备单调性,就需要建立缓冲空间,将被动流的数据变的有序起来。

A可以适当延迟个几秒,等带被动流的窗口排序工作完成后再去驱动时间。同时也就是对被动流提出来

新的要求,那就是虽然可以乱序,但是时效性必须高。

协调器

一共需要四个组件:驱动流缓存池、被动流缓存池、被动流增量加载方法、被流动回溯方法。

缓存驱动流最近N条数据,主要是给被动流严重delay数据进行回溯时使用的。

如果被动数据没有严重delay,那么就写入到被动流缓冲池中。

驱动流有新数据时,触发被流动缓冲池中的数据进行增量加载。

总结

上面提到的方案有很大的局限性,还需要进一步的实践的检验。