书接上回,我们已经将日活的维度数据写入Redis的Bitmap结构中。

今天再设计一个简单的查询引擎,通过表达式转化为Redis的指令集,最终得到日活的结果。

设计一个简单的表达式

最通用的表达式就是SQL了,Hive、Drill、Presto、Phoenix等开源项目都在做类似的事情。

我们这个筛选维度求日活的功能,没有那么复杂,所以简单一些就好。

(C1 = 3 & C2 = 1) & (C5 = 1 | C8 = 6)

其中C(N)代表选择第几个维度,等号后面代表维度的值,

“&”代表AND操作,“|”代表OR操作,括号表示结合顺序。

这个表达式的含义我想很容易理解,跟四则混合运算差不多。

表达式的解释器

下面看看我们如何解析这个表达式,将表达式转化为Redis执行的指令。

之前玩了一段时间的Lpeg,也做过一些小东西,这里还是用Lpeg来做,

在四则混合运算的例子上加以改进即可。

下面是对表达式的定义。

function Blk(p)
  return p * V "Space"
end

local G = P{
    V "Space" * V "Stmt" ;
    Stmt      = Cf(V "Group" * Cg(V "LogicSig" * V "Group")^0 , operation) ,
    Group     = V "Element" + V "Open" * V "Stmt" * V "Close" ,
    Element   = Cg(Blk(V "ColNum") * Blk(V "EqSignal") * Blk(V "ColVal") / selector) ,
    LogicSig  = Blk(C(S "&|")),

    ColNum    = P "C" * C(R "09"^1) ,
    EqSignal  = C(P "=") ,
    ColVal    = C((R "az" + R "AZ" + R "09")^1) ,

    Open      = Blk(P "(") ,
    Close     = Blk(P ")") ,
    Space     = S(" \n\t")^0 ,
}

在解析到 C1=1 类似的结构时用selector函数处理

function selector(colNum , op , colVal)
  lastKey = {"b" , colNum .. '-' .. colVal}
  return lastKey
end

解析到 XXX & YYY 或者 XXX | YYY 的结构时用operation函数处理

function operation(left , op , right)
  local action = ""
  if op == "&" then 
    action = "AND"
  elseif op == "|" then
    action = "OR"
  end
  local tmpKey = "__TMP-" .. tmpCount
  lastKey = {"t" , tmpKey}
  table.insert(recycle , tmpKey)
  tmpCount = tmpCount + 1
  table.insert(process , {"BITOP" , action , tmpKey , left[1] , left[2] , right[1] , right[2]})
  return {"t" , tmpKey }
end

因为在Bitmap运算过程中,我们会产生一些临时的Bitmap数据,以__TMP开头的Key,

这些Key在运算执行结束后,需要清理掉,我们将这些Key都记录在recycle数据里。

下面看一个表达式解析的过程:

(C1 = 3 & C2 = 1) & (C5 = 1 | C8 = 6)

--------------------
C1	=	3
C2	=	1
table: 0x41760200	&	table: 0x417553e0
C5	=	1
C8	=	6
table: 0x417600f0	|	table: 0x41760178
table: 0x4175db68	&	table: 0x4175da68

再看看解析的结果:

lastKey : {"t" , "__TMP-3"}
process : [
           ["BITOP","AND","__TMP-1","b","1-3","b","2-1"],
           ["BITOP","OR","__TMP-2","b","5-1","b","8-6"],
           ["BITOP","AND","__TMP-3","t","__TMP-1","t","__TMP-2"],
           ["BITCOUNT",["t","__TMP-3"]]
          ]
recycle : ["__TMP-1","__TMP-2","__TMP-3"]

说明:其中的”t”代表是临时变量的key , “b”代表的是需要结合Block块的序号的Key。

如果完善一些的话,我们应该对逻辑表达式进行一下处理,也就是执行计划的优化。

这里只是一个Demo,所以我们先跳过这一块,先把流程跑通。

将执行计划推送给Redis

前一篇Blog,我们已经写了一个封装查询的LuaScript了,LuaScript会根据

Nest结构中的数据,遍历所有的Block块,在每个Block块中进行,逻辑表达式的运算,

最后把UV都SUM起来,就是返回的日活结果。

这里我将process和recycle的数据都打包成了json(redis的lua可以使用cjson)。

下面是重写的ReadScript

local scriptRead = redis:script("LOAD" , 
[[
  local result = 0
  local tableName , dateTime = KEYS[1] , KEYS[2]
  local process , recycle    = cjson.decode(ARGV[1]) , cjson.decode(ARGV[2])
  local key4N = table.concat({tableName , "-" , dateTime , "-Nest"})

  for index , value in ipairs(process) do
    local params
    if value[1] == "BITOP" then
      params = {value[1] , value[2] , value[3]}
      for seq = 4 , #value , 2 do
        if value[seq] == "b" then
          table.insert(params , {tableName , "-" , dateTime , "-" , value[seq + 1] , "-BMP-" , 7})
        elseif value[seq] == "t" then
          table.insert(params , value[seq + 1])
        end
      end
    elseif value[1] == "BITCOUNT" then
      params = {value[1]}
      if value[2] == "b" then
          table.insert(params , {tableName , "-" , dateTime , "-" , value[3] , "-BMP-" , 7})
        elseif value[2] == "t" then
          table.insert(params , value[3])
        end
    end
    process[index] = params
  end
  
  local bytepos = 0
  local pos = redis.call("bitpos" , key4N , 1 , bytepos)
  while (pos >= 0)
  do
    for g = pos , pos + 7 - bit.band(pos , 7) do
      local fill = redis.call("getbit" , key4N , g)
      if fill == 1 then
        for index , value in ipairs(process) do
          local ps = {}
          for id , item in ipairs(value) do
            if type(item) == "table" then
              item[7] = g
              ps[id] = table.concat(item)
            else
              ps[id] = item
            end
          end
          if ps[1] == "BITCOUNT" then
            result = result + redis.call(unpack(ps))
          else
            redis.call(unpack(ps))
          end
        end
      end
    end
    bytepos = bit.rshift(pos , 3) + 1 
    pos = redis.call("bitpos" , key4N , 1 , bytepos)
  end
  if #recycle > 0 then
    redis.call("del" , unpack(recycle))
  end
  return result
]]
)

scripts:set("ReadScript" , scriptRead)
ngx.say("ReadScript : " .. scriptRead)

最后我们再改写一些Openresty对查询接口的封装

G:match(expression)
table.insert(process , {"BITCOUNT" , unpack(lastKey)}) 

local r = redis:evalsha(scripts:get("ReadScript") , 2 , tableName , dateTime , json.encode(process) , json.encode(recycle))
ngx.say(r)