之前写过两篇blog,是关于项目使用avro过程中碰到的问题。

最近看了一下最新版本的avro1.9.x,很多之前的问题都已经得到了优化。

比如schema已经不需要考虑sync的性能问题了。

现有问题

大数据项目中Avro基本上是事实标准了,但是管理schema是一个非常痛苦的事情。

之前公司同事制定的kafka消息标准格式是一个avro类中包含一些常见固定字段和一个大的Map,

schema格式的差异都体现在map的key当中,value也规定是String类型。

这种方案基本上跟使用json没有区别,数据体积比较大,序列化反序列化的效率也不高。

代码中也充斥了大量的tostring和valueof,非常影响性能,也容易报错。

解决保留数据类型的问题

avro中的标准数据类型大概有7种,string、bytes、int、long、float、double、boolean。

每种类型都可能出现,但是每种类型数据出现的次数是不确定的,这个问题可以通过array来解决。

    private java.util.List<java.lang.String> strings;
    private java.util.List<java.nio.ByteBuffer> bytes;
    private java.util.List<java.lang.Integer> ints;
    private java.util.List<java.lang.Long> longs;
    private java.util.List<java.lang.Float> floats;
    private java.util.List<java.lang.Double> doubles;
    private java.util.List<java.lang.Boolean> booleans;

元数据管理

因为数据写入了Array,所以字段名称就需要和index进行绑定,需要做简单的Meta管理功能。

其中每个column都有下面这样一个MetaEntity:

    private String columnName;
    private Schema.Type type;
    private int indexOfall;
    private int indexOftype;

固定字段

    private long nid;
    private java.lang.String sid;
    private exchange.Opt opt;
    private long timestamp;
    private java.util.Map<java.lang.String,java.lang.String> tags;
    private long metaId;
  1. nid和sid是用来标识当前这条数据的唯一Id
  2. opt是指这条数据的操作类型:insert、update或delete
  3. timestamp用来存放一个时间戳,一般值数据生成时间
  4. tags用来存放一些tag,比如数据的业务分类等
  5. metaId用来关联Meta

之前我写过一篇blog介绍过如何局部解析avro,为了方便实现filter功能,将tags放在

相对靠前的位置,方便快速的局部解析,进行数据过滤或者路由。

update类型数据

如果是update类型的操作,就存在更新前的数据和更新后的数据,参考binlog中的event设计,

我们也用一个bitset来记录哪些字段被更新了,在avro中用long来表示。

为了存放前后两个数据,我们将各种类型的数组进行了扩容(×2),通过数组下标来控制读写。

包装

最后我们将针对data的操作包装在一个datacontainer里,屏蔽复杂度。

public class DataContainer {
    private Meta meta;
    private Data data;
    private BitSet updatesBitSet;
    private int positionRatio = 1; //如果是update的话 ratio为2

数据写入的实现

        int pos = metaEntity.getIndexOftype() * this.positionRatio;
        switch (metaEntity.getType()) {
            case STRING:
                this.data.getStrings().set(pos, (String) val);
                if (change) {
                    this.data.getStrings().set(pos + 1, (String) oldVal);
                    this.updatesBitSet.set(metaEntity.getIndexOfall());
                }
                break;
            case BYTES:
                this.data.getBytes().set(pos, (ByteBuffer) val);
                if (change) {
                    this.data.getBytes().set(pos + 1, (ByteBuffer) oldVal);
                    this.updatesBitSet.set(metaEntity.getIndexOfall());
                }
                break;

结论

通过简单的测试,序列化后的数据体积减少了接近一半,序列化也加速了50%左右。