如何查询Redis中的Bitmap数据
书接上回,我们已经将日活的维度数据写入Redis的Bitmap结构中。
今天再设计一个简单的查询引擎,通过表达式转化为Redis的指令集,最终得到日活的结果。
设计一个简单的表达式
最通用的表达式就是SQL了,Hive、Drill、Presto、Phoenix等开源项目都在做类似的事情。
我们这个筛选维度求日活的功能,没有那么复杂,所以简单一些就好。
(C1 = 3 & C2 = 1) & (C5 = 1 | C8 = 6)
其中C(N)代表选择第几个维度,等号后面代表维度的值,
“&”代表AND操作,“|”代表OR操作,括号表示结合顺序。
这个表达式的含义我想很容易理解,跟四则混合运算差不多。
表达式的解释器
下面看看我们如何解析这个表达式,将表达式转化为Redis执行的指令。
之前玩了一段时间的Lpeg,也做过一些小东西,这里还是用Lpeg来做,
在四则混合运算的例子上加以改进即可。
下面是对表达式的定义。
function Blk(p)
return p * V "Space"
end
local G = P{
V "Space" * V "Stmt" ;
Stmt = Cf(V "Group" * Cg(V "LogicSig" * V "Group")^0 , operation) ,
Group = V "Element" + V "Open" * V "Stmt" * V "Close" ,
Element = Cg(Blk(V "ColNum") * Blk(V "EqSignal") * Blk(V "ColVal") / selector) ,
LogicSig = Blk(C(S "&|")),
ColNum = P "C" * C(R "09"^1) ,
EqSignal = C(P "=") ,
ColVal = C((R "az" + R "AZ" + R "09")^1) ,
Open = Blk(P "(") ,
Close = Blk(P ")") ,
Space = S(" \n\t")^0 ,
}
在解析到 C1=1 类似的结构时用selector函数处理
function selector(colNum , op , colVal)
lastKey = {"b" , colNum .. '-' .. colVal}
return lastKey
end
解析到 XXX & YYY 或者 XXX | YYY 的结构时用operation函数处理
function operation(left , op , right)
local action = ""
if op == "&" then
action = "AND"
elseif op == "|" then
action = "OR"
end
local tmpKey = "__TMP-" .. tmpCount
lastKey = {"t" , tmpKey}
table.insert(recycle , tmpKey)
tmpCount = tmpCount + 1
table.insert(process , {"BITOP" , action , tmpKey , left[1] , left[2] , right[1] , right[2]})
return {"t" , tmpKey }
end
因为在Bitmap运算过程中,我们会产生一些临时的Bitmap数据,以__TMP开头的Key,
这些Key在运算执行结束后,需要清理掉,我们将这些Key都记录在recycle数据里。
下面看一个表达式解析的过程:
(C1 = 3 & C2 = 1) & (C5 = 1 | C8 = 6)
--------------------
C1 = 3
C2 = 1
table: 0x41760200 & table: 0x417553e0
C5 = 1
C8 = 6
table: 0x417600f0 | table: 0x41760178
table: 0x4175db68 & table: 0x4175da68
再看看解析的结果:
lastKey : {"t" , "__TMP-3"}
process : [
["BITOP","AND","__TMP-1","b","1-3","b","2-1"],
["BITOP","OR","__TMP-2","b","5-1","b","8-6"],
["BITOP","AND","__TMP-3","t","__TMP-1","t","__TMP-2"],
["BITCOUNT",["t","__TMP-3"]]
]
recycle : ["__TMP-1","__TMP-2","__TMP-3"]
说明:其中的”t”代表是临时变量的key , “b”代表的是需要结合Block块的序号的Key。
如果完善一些的话,我们应该对逻辑表达式进行一下处理,也就是执行计划的优化。
这里只是一个Demo,所以我们先跳过这一块,先把流程跑通。
将执行计划推送给Redis
前一篇Blog,我们已经写了一个封装查询的LuaScript了,LuaScript会根据
Nest结构中的数据,遍历所有的Block块,在每个Block块中进行,逻辑表达式的运算,
最后把UV都SUM起来,就是返回的日活结果。
这里我将process和recycle的数据都打包成了json(redis的lua可以使用cjson)。
下面是重写的ReadScript
local scriptRead = redis:script("LOAD" ,
[[
local result = 0
local tableName , dateTime = KEYS[1] , KEYS[2]
local process , recycle = cjson.decode(ARGV[1]) , cjson.decode(ARGV[2])
local key4N = table.concat({tableName , "-" , dateTime , "-Nest"})
for index , value in ipairs(process) do
local params
if value[1] == "BITOP" then
params = {value[1] , value[2] , value[3]}
for seq = 4 , #value , 2 do
if value[seq] == "b" then
table.insert(params , {tableName , "-" , dateTime , "-" , value[seq + 1] , "-BMP-" , 7})
elseif value[seq] == "t" then
table.insert(params , value[seq + 1])
end
end
elseif value[1] == "BITCOUNT" then
params = {value[1]}
if value[2] == "b" then
table.insert(params , {tableName , "-" , dateTime , "-" , value[3] , "-BMP-" , 7})
elseif value[2] == "t" then
table.insert(params , value[3])
end
end
process[index] = params
end
local bytepos = 0
local pos = redis.call("bitpos" , key4N , 1 , bytepos)
while (pos >= 0)
do
for g = pos , pos + 7 - bit.band(pos , 7) do
local fill = redis.call("getbit" , key4N , g)
if fill == 1 then
for index , value in ipairs(process) do
local ps = {}
for id , item in ipairs(value) do
if type(item) == "table" then
item[7] = g
ps[id] = table.concat(item)
else
ps[id] = item
end
end
if ps[1] == "BITCOUNT" then
result = result + redis.call(unpack(ps))
else
redis.call(unpack(ps))
end
end
end
end
bytepos = bit.rshift(pos , 3) + 1
pos = redis.call("bitpos" , key4N , 1 , bytepos)
end
if #recycle > 0 then
redis.call("del" , unpack(recycle))
end
return result
]]
)
scripts:set("ReadScript" , scriptRead)
ngx.say("ReadScript : " .. scriptRead)
最后我们再改写一些Openresty对查询接口的封装
G:match(expression)
table.insert(process , {"BITCOUNT" , unpack(lastKey)})
local r = redis:evalsha(scripts:get("ReadScript") , 2 , tableName , dateTime , json.encode(process) , json.encode(recycle))
ngx.say(r)