Avro优化
之前写过两篇blog,是关于项目使用avro过程中碰到的问题。
最近看了一下最新版本的avro1.9.x,很多之前的问题都已经得到了优化。
比如schema已经不需要考虑sync的性能问题了。
现有问题
大数据项目中Avro基本上是事实标准了,但是管理schema是一个非常痛苦的事情。
之前公司同事制定的kafka消息标准格式是一个avro类中包含一些常见固定字段和一个大的Map,
schema格式的差异都体现在map的key当中,value也规定是String类型。
这种方案基本上跟使用json没有区别,数据体积比较大,序列化反序列化的效率也不高。
代码中也充斥了大量的tostring和valueof,非常影响性能,也容易报错。
解决保留数据类型的问题
avro中的标准数据类型大概有7种,string、bytes、int、long、float、double、boolean。
每种类型都可能出现,但是每种类型数据出现的次数是不确定的,这个问题可以通过array来解决。
private java.util.List<java.lang.String> strings;
private java.util.List<java.nio.ByteBuffer> bytes;
private java.util.List<java.lang.Integer> ints;
private java.util.List<java.lang.Long> longs;
private java.util.List<java.lang.Float> floats;
private java.util.List<java.lang.Double> doubles;
private java.util.List<java.lang.Boolean> booleans;
元数据管理
因为数据写入了Array,所以字段名称就需要和index进行绑定,需要做简单的Meta管理功能。
其中每个column都有下面这样一个MetaEntity:
private String columnName;
private Schema.Type type;
private int indexOfall;
private int indexOftype;
固定字段
private long nid;
private java.lang.String sid;
private exchange.Opt opt;
private long timestamp;
private java.util.Map<java.lang.String,java.lang.String> tags;
private long metaId;
- nid和sid是用来标识当前这条数据的唯一Id
- opt是指这条数据的操作类型:insert、update或delete
- timestamp用来存放一个时间戳,一般值数据生成时间
- tags用来存放一些tag,比如数据的业务分类等
- metaId用来关联Meta
之前我写过一篇blog介绍过如何局部解析avro,为了方便实现filter功能,将tags放在
相对靠前的位置,方便快速的局部解析,进行数据过滤或者路由。
update类型数据
如果是update类型的操作,就存在更新前的数据和更新后的数据,参考binlog中的event设计,
我们也用一个bitset来记录哪些字段被更新了,在avro中用long来表示。
为了存放前后两个数据,我们将各种类型的数组进行了扩容(×2),通过数组下标来控制读写。
包装
最后我们将针对data的操作包装在一个datacontainer里,屏蔽复杂度。
public class DataContainer {
private Meta meta;
private Data data;
private BitSet updatesBitSet;
private int positionRatio = 1; //如果是update的话 ratio为2
数据写入的实现
int pos = metaEntity.getIndexOftype() * this.positionRatio;
switch (metaEntity.getType()) {
case STRING:
this.data.getStrings().set(pos, (String) val);
if (change) {
this.data.getStrings().set(pos + 1, (String) oldVal);
this.updatesBitSet.set(metaEntity.getIndexOfall());
}
break;
case BYTES:
this.data.getBytes().set(pos, (ByteBuffer) val);
if (change) {
this.data.getBytes().set(pos + 1, (ByteBuffer) oldVal);
this.updatesBitSet.set(metaEntity.getIndexOfall());
}
break;
结论
通过简单的测试,序列化后的数据体积减少了接近一半,序列化也加速了50%左右。