类型转换
之前几个月一直在忙一个binlog抽取的项目,将mysqlbinlog拉出来,写入kafka,
之后消费kafka中的数据写到HDFS,文件格式为orcfile+snappy。
背景
上一篇blog讲过kafka中的数据是以avro为载体的,其中的数据字段存在一个map中,
map的key和value都是charsequence,也就是说我们的数据在经过avro之后失去了类型信息。
我们的目标端是写入hdfs上的orcFile,如果是写sequencefile,基本上我们就不关心类型了。
阿里早期的数据仓库中,几乎所有的字段类型都是string的,这样做显然会有空间的浪费,
但也比较方便,不容易出错,方便管理。在前一家公司里,我负责的离线数据都是以parquet为主的,
orc相比parquet查询的性能会更快一些。
类型转换
orc是有字段类型概念的,那么我们如何将string转成具体的类型呢?
首先我们要知道原始类型(mysql中的类型),还要知道hive表中的字段类型(orc类型)。
知道了这两端的类型,我们就有可能完成这个工作了。最简单的处理办法就是笛卡尔积。
把每种组合的处理函数写好,进行配置就可以了。这样做的缺点非常明显,就是工作量大。
如果有一天我们不再使用orc格式的类型,换成parquet或者其他的,那么还需要大量的重复工作。
于是解决这个问题的关键是降低耦合度,降低复杂度。
之前在阅读阿里开源的datax的时候看到过一个类似问题的解决方案,引入状态机。
mysql的常见字段类型大概不到20种,为每种类型创建一个type类,并提供转成其他类型的方法。
public interface JavaType {
Boolean toBoolean(String value);
Integer toInt(String value);
Long toLong(String value);
Date toDate(String value);
Float toFloat(String value);
Double toDouble(String value);
default String toString(String value) {
return value;
}
default void unsupport() {
throw new RuntimeException("type error");
}
}
碰到把boolean转date显然是不可能实现的,那么就直接unsupport好了。
接下来解决orc端的问题
TINYINT(new C_LongColumnVector()) {
@Override
public TypeDescription toOrcTypeDescption() {
return TypeDescription.createByte();
}
}, SMALLINT(new C_LongColumnVector()) {
@Override
public TypeDescription toOrcTypeDescption() {
return TypeDescription.createShort();
}
}, INT(new C_LongColumnVector()) {
@Override
public TypeDescription toOrcTypeDescption() {
return TypeDescription.createInt();
}
}, BIGINT(new C_LongColumnVector()) {
@Override
public TypeDescription toOrcTypeDescption() {
return TypeDescription.createLong();
}
}, BOOLEAN(new C_BooleanColumnVector()) {
@Override
public TypeDescription toOrcTypeDescption() {
return TypeDescription.createBoolean();
}
}, FLOAT(new C_DoubleColumnVector()) {
@Override
public TypeDescription toOrcTypeDescption() {
return TypeDescription.createFloat();
}
}, DOUBLE(new C_DoubleColumnVector()) {
@Override
public TypeDescription toOrcTypeDescption() {
return TypeDescription.createDouble();
}
}, DECIMAL(new C_DecimalColumnVector()) {
@Override
public TypeDescription toOrcTypeDescption() {
return TypeDescription.createDecimal();
}
}, STRING(new C_BytesColumnVector()) {
@Override
public TypeDescription toOrcTypeDescption() {
return TypeDescription.createString();
}
}, BINARY(new C_BytesColumnVector()) {
@Override
public TypeDescription toOrcTypeDescption() {
return TypeDescription.createBinary();
}
}, CHAR(new C_BytesColumnVector()) {
@Override
public TypeDescription toOrcTypeDescption() {
return TypeDescription.createChar();
}
}, VARCHAR(new C_BytesColumnVector()) {
@Override
public TypeDescription toOrcTypeDescption() {
return TypeDescription.createVarchar();
}
}, TIMESTAMP(new C_TimeStampColumnVector()) {
@Override
public TypeDescription toOrcTypeDescption() {
return TypeDescription.createTimestamp();
}
}, DATE(new C_DateColumnVector()) {
@Override
public TypeDescription toOrcTypeDescption() {
return TypeDescription.createDate();
}
};
private Convert convert;
OrcTypeEnum(Convert convert) {
this.convert = convert;
}
public static OrcTypeEnum findType(String type) {
return OrcTypeEnum.valueOf(type.toUpperCase());
}
public abstract TypeDescription toOrcTypeDescption();
public void setValue(ColumnVector vector, int row, String value, JavaType typeConvert) {
convert.eval(vector, row, value, typeConvert);
}
为orc的每种类型创建一个convert,大概十来种的样子。
通过这种方式,我们不仅解决了正常的类型转化需求,还能够天然支持date to long 和long to date这样的复杂需求,
极大的提高了工具的灵活度。
函数化
之前datax的方案是引入一个实体类对原始类型的数据进行包装,如果每一个字段都经过一次包装会严重增加体积,
ygc的频率会提高,所以在我们的方案中是函数化的,相关类型进行编号,性能也有一些提升。
public void writeData(List<String> result, List<ColumnInfo> columnInfos, List<JavaType> javaTypeList) {
int row = this.batch.size++;
for (int i = 0; i < result.size(); i++) {
ColumnInfo col = columnInfos.get(i);
col.getHiveTypeEnum().setValue(batch.cols[i], row, result.get(i), javaTypeList.get(col.getJavaTypeFunctionId()));
}
}