Nginx一致性哈希模块的Lua实现,


Nginx一致性哈希模块的Lua重新实现

 

技术背景:

最近在工作中使用了nginx+redis 的架构,redis在后台做分布式存储,每个redis都存放不同的数据,这些数据都是某门户网站通过Hadoop分析出来的用户行为日志,key是uid,value是user profile,每小时更新量在500-800万条记录,而这些记录一旦生成,我需要在5分钟左右的时间完成所有导入过程。

 

首先,我在nginx中使用了第三方模块HttpUpstreamConsistent来做负载均衡策略,针对不同用户(uid)选取不同的backend redis:

   upstream somestream {
      consistent_hash $arg_uid;
      server 10.50.1.3:11211;
      server 10.50.1.4:11211;
      server 10.50.1.5:11211;
    }

现在问题来了,由于Hadoop系统处理日志的速度非常快,如果把每条记录都通过Nginx来写入Redis中,这样的速度是无法接受的,而且会影响Nginx对正常请求的服务能力。所以,需要将这些数据以离线的方式导入redis集群中,这样就要重新实现HttpUpstreamConsistent模块了,才能保证读写的哈希策略一致。

 

下面的源码演示了如何将HttpUpstreamConsistent模块翻译成Lua的过程,(使用了CRC32作散列,依赖库的路径已列在Reference中)。

#!/usr/bin/lua

-- chenqi@2014/04/02
--[Reference]
--https://github.com/yaoweibin/ngx_http_consistent_hash
--https://github.com/davidm/lua-digest-crc32lua

local CRC = require('CRC32')

local M = {}

local CONSISTENT_BUCKETS = 1024
local VIRTUAL_NODE = 160

local HASH_PEERS = {}
local CONTINUUM = {}
local BUCKETS = {}

local function hash_fn(key)
    return CRC.crc32(key)
end

-- in-place quicksort
function quicksort(array,compareFunc)  
    quick(array,1,#array,compareFunc)  
end  

function quick(array,left,right,compareFunc)  
    if(left < right ) then  
        local index = partion(array,left,right,compareFunc)  
        quick(array,left,index-1,compareFunc)  
        quick(array,index+1,right,compareFunc)  
    end  
end  
  
function partion(array,left,right,compareFunc)  
    local key = array[left] 
    local index = left  
    array[index],array[right] = array[right],array[index]
    local i = left  
    while i< right do  
        if compareFunc( key,array[i]) then  
            array[index],array[i] = array[i],array[index]
            index = index + 1  
        end  
        i = i + 1  
    end  
    array[right],array[index] = array[index],array[right]
    return index;  
end  

-- binary search
local function chash_find(point)
    local mid, lo, hi = 1, 1, #CONTINUUM
    while 1 do
        if point <= CONTINUUM[lo][2] or point > CONTINUUM[hi][2] then
            return CONTINUUM[lo]
        end

        -- test middle point
        mid = lo + math.floor((hi-lo)/2)

        -- perfect match
        if point <= CONTINUUM[mid][2] and point > (mid > 1 and CONTINUUM[mid-1][2] or 0) then
            return CONTINUUM[mid]
        end

        -- too low, go up
        if CONTINUUM[mid][2] < point then
            lo = mid + 1
        else
            hi = mid - 1
        end
    end
end

local function chash_init()
    local n = #HASH_PEERS
    if n == 0 then
        print("There is no backend servers")
        return
    end

    local C = {}
    for i,peer in ipairs(HASH_PEERS) do
        for k=1, math.floor(VIRTUAL_NODE * peer[1]) do
            local hash_data = peer[2] .. "-" .. (k - 1)
            table.insert(C, {peer[2], hash_fn(hash_data)})
        end
    end

    quicksort(C, function(a,b) return a[2] > b[2] end)
    CONTINUUM = C

--[[
    for i=1,#C do
        print(CONTINUUM[i][1],CONTINUUM[i][2])
    end
--]]

    local step = math.floor(0xFFFFFFFF / CONSISTENT_BUCKETS)

    BUCKETS = {}
    for i=1, CONSISTENT_BUCKETS do
        table.insert(BUCKETS, i, chash_find(math.floor(step * (i - 1))))
        -- print(BUCKETS[i][1],BUCKETS[i][2])
    end

end
M.init = chash_init

local function chash_get_upstream_crc32(point)
    return BUCKETS[(point % CONSISTENT_BUCKETS)+1][1]
end
M.get_upstream_crc32 = chash_get_upstream_crc32

local function chash_get_upstream(key)
    local point = math.floor(hash_fn(key)) 
    return chash_get_upstream_crc32(point)
end
M.get_upstream = chash_get_upstream

local function chash_add_upstream(upstream, weigth)
    weight = weight or 1
    table.insert(HASH_PEERS, {weight, upstream})
end
M.add_upstream = chash_add_upstream

return M

 

API调用方式:

local redis_login= {
    "10.50.1.3:11211",
    "10.50.1.4:11211",
    "10.50.1.5:11211",
}

for k, backend in ipairs(redis_login) do
    chash_login.add_upstream(backend)
end
chash_login.init()

uid="309473941"
chash_login.chash_get_upstream(uid)

返回一个backend地址,将该uid对应的数据写入对应的redis中即可,稍后可以使用Nginx读到。

 

PS:关于redis的mass insertion问题,最高效的方式是批量写入文件(文件格式遵循redis协议),然后使用 redis-cli --pipe 直接导入。

 

 

相关内容

    暂无相关文章