Redis最早被大家熟知,是因为其高效的读写速率和丰富的数据结构。

无论是Web业务中做缓存和计数器,还是流计算做数据暂存容器,

应用都十分广泛。尤其是Bit有关的操作,在统计类业务中颇受欢迎。

Redis中常见的Bit操作

1、SetBit (写入)

2、GetBit (读取)

3、BitOp (逻辑运算)

4、BitPos (迭代)

5、BitField (特殊需求)

计算日活跃用户

计算日活是一个非常古老的话题,也是面试题目中很常见的一个问题。

其变种问题有:有限内存的数据排序,在线用户统计等等。

在网上有很多文章介绍如何利用Redis做高效的日活统计,请自行阅读。

使用Bit结构作为数据的存储,好处是高效、节省内存。

之前我写过一篇有关Java的Roaringbitmap的Blog,可以读一下。

维度分析的需求

单一的日活跃统计已经不是什么技术难题了,几乎每个网站或者App都有。

在实际工作中,运营人员希望看到的数据不是一个简单的统计值,而是能做

一些更为复杂的统计分析。比如,漏斗模型、多维度组合分析等等。

通过分析来指导业务的优化,策略的修正。

现在也有很多公司已经积累了大量的用户特征数据,开始完善用户的画像,

通过用户的特征来优化业务逻辑,提升用户体验,提高商业价值。

这类需求简单概括一下就是,我们需要一个列式的存储,方便增加新列,

需要类似Bitmap的索引,能够高效完成多个维度的快速筛选。

以Bitmap索引为核心技术的Druid,是一个非常好的选择,被众多互联网

公司部署,用来做运营分析、广告分析、实时监控。Druid有良好的扩展性,

延迟比较低,实时强,查询速度快,在多维度分析里表现尤其优异。

缺点就是架构比较复杂,硬件成本比较高。也有公司选择ES和Kylin等。

Redis AND Lua

Redis支持加载Lua脚本,让一些原生难以满足的需求可以更优雅的实现。

使用过Redis的人都知道,通过批量提交可以提升写入效率,假如我们的需求

是先Get,根据业务逻辑再Put,就很难批量执行,很难保证原子性。

如果你碰到了迭代的需求,需要反复查询Redis,那么效率也会大打折扣。

Redis提供的LuaScript扩展是一个非常不错的选择,我们可以把业务逻辑

封装在Lua的函数中,把业务逻辑放在离数据更近的地方,类似Hbase的

协处理器的思路,而且Redis本身是单线程的,就不用再处理原子性问题了。

网上有很多Redis调Lua的例子,这里我就不详细介绍了。

Redis提供了Bit操作,又是典型的内存数据存储,如果数据量不是特别大,

能不能用Redis+Lua实现多维度分析日活的功能呢?下面我进行一些尝试。

如何构建Bitmap索引

用户标识

因为是分析日活跃用户,所以采用UserId作为唯一标识,有一些公司的UserId

是UUID,那就需要进行一次转化,考虑到节省Bitmap的问题,ID尽量连续。

维度值

一个用户身上可能有十几个甚至几十个维度属性值,原则上每个值都要单独构建

一个Bitmap,所以还需要维护维度的Value列表。如果维度值过于丰富就会导致

巨大的存储开销。之前在看Kylin的Blog时,注意到有用一个TrieTree的结构

维度Value字典信息,好处是编码ID的信息具备ASCII的连续性,对Between逻辑

的执行优化有帮助。在Redis里维护TrieTree有点麻烦,我们暂时用Set替代。

Key的结构是:T-(命名空间)-(日期)-(维度序号)-VS
Value是:Redis的Sets

维度值维护在Set里,通过控制Set的大小,来限制膨胀的发生,如果这种逻辑

在外面已经处理过了,这里也可以简化掉,直接进行下一步操作。

分块的Bitmap

假如我们有一亿个用户ID,每个维度值都维护容量一亿的Bitmap就非常大了。

为了减小体积,我们将整体用户进行切分。切分的粒度小有助于节省内存,

但是会导致查询性能变差,粒度大则会浪费更多的内存。如果Redis的Bitmap

能够有Roaringbitmap的优化效果就完美了,对Bitmap的压缩可以参考一下

Roaringbitmap的源码,这里就不细说了。

块的大小可以根据自己的数据规模来确定,一般选择16384或者65536等。

blockSeq , lineSeq = bit.rshift(uidNumber , 14) , bit.band(uidNumber , 16383)

之所以取2的N次方,是为了更快的处理分块的序号和偏移,利用位移和与运算。

Key的结构是:T-(命名空间)-(日期)-(维度序号)-(维度值)-BMP-(块序号)
Value是:Redis的Bit

因为做了分块的操作,所以我们还需要维护一个块信息的列表,方便我们遍历计算。

一般来说会在这个块信息列表上存一些统计信息,比如总数。如果没有什么附加信息的话,

我们仍然可以用Bitmap来维护这个列表,节省一点内存开销。

Key的结构是:T-(命名空间)-(日期)-Nest
Value是:Redis的Bit

题外话:Sybase之前有一个专利叫Bit-Wise,可以解决高基维和Between问题,

参考1 参考2

今后我们再讨论这个话题。

如何查询

一般查询的条件就是 col1 = 3 and col4 = true and col10 = abc 这个样子。

所以我们要根据维度的序号和维度的值,找到对应的BitmapKey,参见之前的key结构。

最后我们要根据分块的列表进行遍历,汇总遍历的结果返回。

伪代码如下:

for index in blocksArray
  AND result col1-3-index col4-true-index col10-abc-index
end
bitcount result 
del result

遍历Block列表的时候,我用到了BitPos,这个操作的参数非常麻烦,

参数是byte级别的,返回值是bit,需要自己加逻辑去控制一下。

local scripts = ngx.shared.scripts
local shar = redis:script("LOAD" , 
[[
  local result = 0
  local tableName , dateTime = KEYS[1] , KEYS[2]
  
  local keyT4N = {"T-" , tableName , "-" , dateTime , "-Nest"}
  local key4N = table.concat(keyT4N)

  local subKeys , subKeysT = {} , {}
  for index = 1 , #ARGV , 2 do
    table.insert(subKeysT , {"T-" , tableName , "-" , dateTime , "-" , ARGV[index] , "-" , ARGV[index+1] , "-BMP-" , 10})
  end

  local bytepos = 0
  local pos = redis.call("bitpos" , key4N , 1 , bytepos)
  while (pos >= 0)
  do
    for g = pos , pos + 7 - bit.band(pos , 7) do
      local fill = redis.call("getbit" , key4N , g)
      if fill == 1 then
        for index , val in ipairs(subKeysT) do
          val[10] = g
          subKeys[index] = table.concat(val)
        end
        redis.call("bitop" , "AND" , "_TMP_" , unpack(subKeys))
        local uv = redis.call("bitcount" , "_TMP_")
        result = result + uv
      end
    end

    bytepos = bit.rshift(pos , 3) + 1 
    pos = redis.call("bitpos" , key4N , 1 , bytepos)
  end
  redis.call("del" , "_TMP_")
  return result
]]
)
scripts:set("ReadScript", shar)

数据写入

其实就是将行转列的过程

local scriptWrite = redis:script("LOAD" , 
[[
  local blockSize , blockPower , intSize = 2^15 , 15 , 2^31
  local expireTime = 7 * 86400

  local tableName , dateTime , uidNumber , append = KEYS[1] , tonumber(KEYS[2]) , tonumber(KEYS[3]) , KEYS[4] == "1"
  local expire = (dateTime > 0)
  local blockSeq , lineSeq 

  if uidNumber >= intSize then
    blockSeq , lineSeq = math.modf(uidNumber / blockSize) , uidNumber % blockSize
  else
    blockSeq , lineSeq = bit.rshift(uidNumber , blockPower) , bit.band(uidNumber , blockSize - 1)
  end
  
  local key4N  = table.concat({tableName , "-" , dateTime , "-Nest"})
  local keyT4V = {tableName , "-" , dateTime , "-" , 5 , "-" , "VS"}
  local keyT4M = {tableName , "-" , dateTime , "-" , 5 , "-" , 7 , "-BMP-" , blockSeq}
  
  local write = function(keyT4V , keyT4M , index , val , lineSeq , expireTime)
    local oldBit
    keyT4V[5] , keyT4M[5] = index , index
    local key4V = table.concat(keyT4V)
    local isExist = redis.call("SISMEMBER" , key4V , val)
    if isExist == 1 then
      keyT4M[7] = val
      local key4M = table.concat(keyT4M)
      oldBit = redis.call("setbit" , key4M , lineSeq , 1)  
    else
      local count = redis.call("SCARD" , key4V)
      if count >= 1024 then
        val = "IGNORE"
      end
      if count <= 1024 then 
        redis.call("SADD" , key4V , val)
      end      
      if count == 0 and expire then
        redis.call("expire" , key4V , expireTime)
      end
      keyT4M[7] = val
      local key4M = table.concat(keyT4M)
      oldBit = redis.call("setbit" , key4M , lineSeq , 1)
      if expire then
        redis.call("expire" , key4M , expireTime)
      end
    end
    return oldBit
  end

  local oldBitNum = 0
  if append then  
    for seq = 1 , #ARGV , 2 do
      oldBitNum = oldBitNum + write(keyT4V , keyT4M , ARGV[seq] , ARGV[seq + 1] , lineSeq , expireTime)
    end
  else
    for index , val in ipairs(ARGV) do
      oldBitNum = oldBitNum + write(keyT4V , keyT4M , index , val , lineSeq , expireTime) 
    end
  end
  if oldBitNum == 0 then
    redis.call("setbit" , key4N , blockSeq , 1)
  end
  return true
]]
)
scripts:set("WriteScript", scriptWrite)

OpenrestyLua包装的查询接口

local ngx_re      = require "ngx.re"
local resty_redis = require "resty.redis"
local redis       = resty_redis:new()
redis:set_timeout(1000)
local ok , err    = redis:connect("127.0.0.1", 6379)

if err then
  ngx.say(err)
  return
end

local scripts = ngx.shared.scripts

local tableName = ngx.var.arg_table
local dateTime  = ngx.var.arg_datetime

local dimsStr   = ngx.var.arg_dims
local dims      = ngx_re.split(dimsStr , ",")

local r = redis:evalsha(scripts:get("ReadScript") , 2 , tableName , dateTime , unpack(dims))
ngx.say(r)
redis:close()