Flume优化
Flume是Apache大数据套件中比较亲民的,安装、配置简单,功能丰富,而且极易定制化。
Flume的整体设计也是非常值得学习的。之前美团在官方技术blog中,着重讲述了其在Flume
上的定制化开发和性能优化,也让Flume普及率大大提高了。
一般使用Flume主要是作为agent抓取日志或者从Kafka读取消息写入HDFS、Hbase等,
而作为消息通道的功能,这几年渐渐被性能卓越的Kafka所取代。
而在日志抓取领域,Flume的功能性和性能一直都不是最佳选择。
下面介绍一下我在Flume定制开发和性能优化中的一些实践。
KafkaSourceSink
在我的应用场景中,Flume主要是从Kafka拉取数据写入到HDFS上,还有就是从Kafka到Kafka。
Kafka这几年的发展很快,从比较流行的0.8.2.2到现在的0.10.X版本,API的兼容不是很好。
Flume的官方版本支持的是0.9.X,需要定制一下Kafka的版本支持。把KafkaSource和Sink
中,跟KafkaAPI打交道的方法都封装在抽象类中,提供多种版本的实现,这样就具备了不同Kafka
版本之间的消息迁移,主要用途就是在业务升级Kafka版本时,作为辅助的数据迁移工具,
业务方可以分别升级Producer端和Consumer端,减少升级的难度,控制升级的风险。
优化性能
整体来说Flume的性能并不高,很多组件为了实现通用性,都放弃了性能。
通过JFR的火焰图,我们找到了一些可以优化的点,下面列举一些。
HDFSEventSink
HDFSEventSink最大的问题是HDFSPath路径的渲染,将EventHeader里的Tag写入Path路径中,
还有一些预设的变量,比如年月日时分秒。这种渲染逻辑是每条数据都要执行的,所以开销非常大。
解决办法:
尽量少的使用变量,可以在Source端将路径的相关信息都拼凑好,写成一两个tag,
最后在sink端做简单的字符串拼接,性能至少可以提升两三倍。
注意:日期格式的渲染也要考虑,如果每条数据都将timestamp格式化成YMD也是非常耗时的,
可以适当引入cache来解决这个问题,比如将timestamp整除60000,缓存一分钟的格式化结果。
网上见到很多人的flume例子中,都是写TextFile到HDFS的,我是选择写的SequenceFile,
并且加了Snappy的压缩。SequenceFile比TextFile的好处就不讲了,网上有很多介绍。
另外sequenceFile也有一点点可以改的地方,将HDFSWritableSerializer的KeyClass修改掉,
Flume源码里是LongWritable,可以改成NullWritable。修改过的SequenceFile并不影响使用,
HDFS的文件大小会有百分之X的下降,相应的网络传输也会有一些收益。
Event、SimpleEvent
在Flume体系中,Event是数据的载体,一共有两部分组成,一个Header,一个Body。
Header是一个HashMap,这里的HashMap是可以优化的,参见我上一篇文章吧。
这里有一个陷阱需要注意,SimpleEvent的实现中,Header是每次创建时就new了一个HashMap。
按照Flume源码中Kafkasource的逻辑EventBuilder.withBody(eventBody, headers),
你在外面构建的headers(map),会在event初始化时复制一遍,这里可以优化一下。
ChannelProcessor
ChannelProcessor的实现比较丰富,但不一定你都需要用到,这里首先可以做了一下裁剪。
在我的Flume中不会用到optional和InterceptorChain,所以先将相关逻辑删掉了。
processEventBatch方法的参数events,在下面的实现逻辑中被复制到了reqChannelQueue里,
在我的Source中,一个batch操作的数据都是指向同一个channel的,所以这个复制也没有意义。
只需要判断第一个元素的header,选择req的Channel,直接执行Put即可。
如果一个batch的数据可能是流向不同channel的,那么尽量在source端拆分好。
source端承载数据的list是可以反复重用的,而且capacity也可以预设大小,避免resize。
最后
写好的代码如果你是以plugin的形式来开发flume的话,那上面的部分代码是没法生效的。
比如ChannelProcessor,有一个简单的办法,在不用修改Flume源码的情况下,就可以生效。
找到Flume/bin目录下的flume-ng启动脚本,在这里修改一下java -cp 后面的参数顺序,
保证你的plugin目录,在flume/lib目录之前,就可以优先加载所覆写的类了。