关于Mysql的Binlog抽取
这个月换了工作,双十一之前入职了京东大数据平台——实时数据组。
着手关于MysqlBinlog相关系统的重构工作,这次就讲讲Binlog吧。
Binlog转实时数据流的开源动态
伪装MysqlSlave来获取MysqlBinlog的技术这几年已经非常普及了,很多中等规模的公司都有相关的
基础技术组件,大多数是基于阿里几年前开源的Canal项目,在此基础上进行二次开发,将数据写入Kafka
或者其他MQ系统中去,阿里云的RDS还提供类似的服务,也非常省心。
17年年初的时候我也有意要做类似的项目,所以已经进行了最基础的技术选型和调研。
Java系中最早的binlog开源项目有tungsten-replicator 、open-replicator等,后来都慢慢废弃了。
之后Canal几乎统一了这个领域,但是阿里的这个开源项目最近几年的更新频率很一般。
最近两年比较新的项目有mysql-time-machine,zendesk的maxwell都不错。
shyiko的mysql-binlog-connector-java是一个BinlogClient的超精简内核,也被很多开源项目采用。
Mysql的Binlog
Mysql的Binlog协议和格式我就不多介绍了,网上可以搜到很多。
值得注意的时候,Mysql一定要配置成Row模式的,而且image需要配置成Full模式的。
我采用shyiko的mysql-binlog-connector-java的方案进行了一下性能测试,可以达到80M/s。
数据解析
shyiko的mysql-binlog-connector-java已经将数据解析成Java类型了,接下来只需要针对公司的规范
进行数据的清洗和整理即可,最后转成标准序列化格式,比如AVRO、Pb、Json等。
这中间性能损耗最大的地方就是序列化的开销了,要考虑异步或者多线程流等方式解决。
在开发解析逻辑时碰到了不少细节问题,用shyiko的mysql-binlog-connector-java解析的数据与canal
有一些细小的差别,比如datetime和timestamp字段解析的结果不一样,有可能受时区的影响;
还有像text类型的字段是byte数组,需要自己指定charset转为string;decimal也有差异,需要使用
numberformat进行格式化,group设置为false,并且setMinimumFractionDigits(1)。
写Kafka有序
将数据写到Kafka的速度是非常快的,但是因为Binlog的特殊性,需要一些设计,来保证数据的有序。
Kafka的Client会把收到的数据根据meta进行进行重新组织,按照broker的使用情况进行分批发送。
所以我们要根据业务情况指定Kafka的MessageKey,将相同业务含义的数据写到同一个Partition
下,保证有序,乱序会导致最新值被老值覆盖。
在写Kafka的过程中难免会有一些失败的情况发生,错误的重试机制由Kafka来保证,并且要强制设置
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 1,以防止重试时发生数据顺序错误。
Task的结构
P1 | B1 | P2 | B2 | P3 | ||||
---|---|---|---|---|---|---|---|---|
单线程拉Binlog | > | Ringbuffer | > | 单线程转化成AVRO对象 | > | Ringbuffer | > | 序列化并发送Kafka |
> | Ringbuffer | > | 序列化并发送Kafka |
注释:P一般对应一个线程,B对应一个Disrupter的Ringbuffer。
P1和P2环节的性能都非常好,所以采用的是单线程,阶段并行模式。
P3阶段采用并行模式,P2处理好的数据根据一些key进行选择,路由到N个B2中去,一般可以用库表名称。
压测时,这个Task跑起来之后可以让负载达到400%,基本上把Cpu资源跑满了。
持久化位点信息
什么是位点信息?
简单来说就是binlog的filename和offset,为了保证at least once,
我们需要定期记录消费的位置,以便任务重启之后,继续消费,不丢数据(可有少量的重复)。
如果P3阶段是单线程的,那么记录位点非常简单,提交kafka成功后,记录最后一条数据的offset,
可以异步刷到远程存储中去,防止阻塞数据流。可是,现在P3阶段是多线程的,如何记录位点呢?
定期让P2对所有的B2发送一条相同的含有位点信息的心跳包(位点值为P2最后处理的一条数据的位点)
通过这种方式,将各个P3的信息进行更新,每次P3将数据成功发送到Kafka后就将其中的心跳包记录下来,
作为一个可信的回退点,注意这个心跳包一定要在数据流中流转,起到挤压数据的作用,
防止某些P3无数据的情况发生。相关代码参考我的sucker项目,这里就不介绍代码细节了。
解决这个问题的思路源于Flink介绍文章中的一段话:
Stream Barrier是Flink分布式Snapshotting中的核心元素,它会作为数据流的记录被同等看待,
被插入到数据流中,将数据流中记录的进行分组,并沿着数据流的方向向前推进。每个Barrier会携带
一个Snapshot ID,属于该Snapshot的记录会被推向该Barrier的前方。因为Barrier非常轻量,所以
并不会中断数据流。带有Barrier的数据流。