Nginx + Lua + Kafka + Redis + Mysql,nginxkafka


写在开头

# 系统版本  cat /etc/issue
CentOS release 6.8 (Final)
# 切换到tmp目录
cd /tmp

安装 lua

# 下载
wget http://luajit.org/download/LuaJIT-2.0.5.tar.gz
# 解压
tar zxf LuaJIT-2.0.5.tar.gz

cd LuaJIT-2.0.5
# 编译
make PREFIX=/usr/local/LuaJIT
# 安装
make install PREFIX=/usr/local/LuaJIT

# 动态链接库
echo "/usr/local/LuaJIT/lib" > /etc/ld.so.conf.d/usr_local_luajit_lib.conf
ldconfig

# 设置环境变量
export LUAJIT_LIB=/usr/local/LuaJIT/lib
export LUAJIT_INC=/usr/local/LuaJIT/include/luajit-2.0

下载lua-nginx-module

# 下载
wget https://codeload.github.com/openresty/lua-nginx-module/tar.gz/v0.10.13
# 解压
tar zxf v0.10.13

下载ngx_devel_kit

# 下载
wget https://github.com/simplresty/ngx_devel_kit/archive/v0.3.0.tar.gz
# 解压
tar zxf v0.3.0.tar.gz

下载lua-resty-kafka

# 下载
wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip
# 解压
unzip master.zip

# 新建目录
mkdir /usr/local/lua
# 拷贝
cp -r lua-resty-kafka-master /usr/local/lua/lua-resty-kafka

安装 pcre

# 下载
wget https://jaist.dl.sourceforge.net/project/pcre/pcre/8.42/pcre-8.42.tar.bz2
# 解压
tar -jxf pcre-8.42.tar.bz2

chmod -R 777 pcre-8.42
cd pcre-8.42
# 配置
./configure
# 安装
make
# 安装
make install

# 创建 软链
ln -s /usr/local/lib/libpcre.so.1 /lib64/

安装openresty

# http://openresty.org/cn/linux-packages.html

yum install pcre-devel openssl-devel gcc curl
sudo yum install yum-utils
sudo yum-config-manager --add-repo https://openresty.org/package/centos/openresty.repo
sudo yum install openresty

拷贝lua-resty-kafka

mkdir /usr/local/openresty/lualib/resty/kafka
cp -R /tmp/lua-resty-kafka-master/lib/resty/kafka/* /usr/local/openresty/lualib/resty/kafka/

安装nginx

# 下载
wget http://nginx.org/download/nginx-1.9.8.tar.gz
# 解压
tar zxf nginx-1.9.8.tar.gz

cd nginx-1.9.8

# 修改src/core/ngx_conf_file.c中的NGX_CONF_BUFFER为8192或者更大, 以避免"too long parameter"

# 配置
./configure --prefix=/usr/local/nginx/ --sbin-path=/usr/bin/nginx --with-http_stub_status_module --with-http_ssl_module --with-http_realip_module --add-module=/tmp/lua-nginx-module-0.10.13 --add-module=/tmp/ngx_devel_kit-0.3.0 --with-pcre=/tmp/pcre-8.42 --with-pcre-jit

# 编译
make

# 安装
make install

nginx 配置

worker_processes  1;

events {
    worker_connections  1024;
}


http {
    add_header Access-Control-Allow-Origin *;
    add_header Access-Control-Allow-Methods 'GET, POST, OPTIONS';
    add_header Access-Control-Allow-Headers 'DNT,X-Mx-ReqToken,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Authorization';
    
    default_type  "application/json;charset=UTF-8";

    lua_package_path "/usr/local/openresty/lualib/?.lua;;";
    lua_package_cpath "/usr/local/openresty/lualib/?.so;;";

    resolver 127.0.0.1;
    
    server {
        listen       8899;
        
        # 读取请求体, 跟ngx.req.read_body()函数作用类似
        lua_need_request_body on;

        location / {
            content_by_lua '
                ngx.header.content_type = "text/plain";
                if jit then
                    ngx.say(jit.version)
                else
                    ngx.say(_VERSION)
                end
            ';
        }

        location /api/v1/sdk/import {

            content_by_lua '
                local producer = require "resty.kafka.producer"
                
                -- 返回体
                function response(code, msg)
                    local resp = {}
                    resp["errcode"] = code
                    resp["errmsg"] = msg
                    ngx.say(cjson.encode(resp))
                end

                local broker_list = {
                    { host = "192.168.1.117", port = 9092}
                }

                local bp = producer:new(broker_list, {producer_type="sync"})

                local ok, err = bp:send("sdk-receive", nil, ngx.var.request_body)
                if not ok then
                    response(10000, "系统错误")
                else
                    response(0, "请求成功")
                end
            ';
        }
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }
    }
}

完整示例

worker_processes  1;

events {
    worker_connections  1024;
}

http {
    add_header Access-Control-Allow-Origin *;
    add_header Access-Control-Allow-Methods 'GET, POST, OPTIONS';
    add_header Access-Control-Allow-Headers 'DNT,X-Mx-ReqToken,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Authorization';
    
    default_type  "application/json;charset=UTF-8";

    lua_package_path "/usr/local/openresty/lualib/?.lua;;";
    lua_package_cpath "/usr/local/openresty/lualib/?.so;;";   

    resolver 127.0.0.1; 

    server {
        listen       8899;

        # 读取请求体, 跟ngx.req.read_body()函数作用类似
        lua_need_request_body on;

        location /api/v1/sdk/import {

            content_by_lua '

                local cjson = require "cjson"

                -- 返回体
                function response(code, msg)
                    local resp = {}
                    resp["errcode"] = code
                    resp["errmsg"] = msg
                    ngx.say(cjson.encode(resp))
                end
                
                -- 获取secret 并校验非空
                local secret = ngx.req.get_uri_args()["secret"]
                if secret == nil or secret == "" then
                    response(10003, "缺少secret参数")
                    return
                end

                -- 获取请求参数 并校验json格式
                local data
                if not xpcall(
                    function()
                        -- 相当于 try
                        data = cjson.decode(ngx.var.request_body)
                    end,
                    function()
                        -- 相当于 catch
                        response(10001,"json数据格式化失败")
                    end
                ) then
                    return
                end

                -- 根据secret获取数据源详情
                local redis = require("resty.redis")
                local redis_instance = redis:new()
                redis_instance:set_timeout(1000)
                local ok, err = redis_instance:connect("192.168.1.117", 6379)
                if not ok then
                    response(10006, "获取数据源信息失败")
                    redis_instance:close()
                    return
                end
                redis_instance:select(6)
                local resp, err = redis_instance:get(string.format("sdk_auth_%s",secret))
                if not err then
                    if type(resp) == "string" then
                        local _resp
                        if not xpcall(
                            function()
                                -- 相当于 try
                                _resp = cjson.decode(resp)
                            end,
                            function()
                            end
                        ) then
                            response(10009, "获取数据源信息失败")
                            return
                        end
                        if not _resp["auth"] then
                            response(10010, "secret已失效")
                            return
                        end
                        data["cid"] = _resp["cid"]
                        data["srcId"] = _resp["sid"]
                    else
	                    -- mysql中取配置信息,并写redis缓存
                        local mysql = require("resty.mysql")
                        local db, err = mysql:new()
                        if not db then
                            response(10007, "获取数据源信息失败")
                            return
                        end
                        db:set_timeout(1000)
                        local ok, err, errcode, sqlstate = db:connect{
                            host = "192.168.1.117",
                            port = 3306,
                            database = "database_test",
                            user = "root",
                            password = "root",
                            charset = "utf8"
                        }
                        if not ok then
                            response(10008, "获取数据源信息失败")
                            return
                        end
                        local res, err, errcode, sqlstate = db:query(string.format("select id,cid from sdk where del_flag=0 and secret=\'%s\'",secret))
                        if not res or res[1] == nil then
                            response(10010, "secret不存在或已失效")
                            db:close()
                            return
                        end
                        local _res = res[1]
                        data["srcId"] = _res["id"]
                        data["cid"] = _res["cid"]
                        local auth_data = {
                            auth = true,
                            sid = _res["id"],
                            cid = _res["cid"]
                        }
                        redis_instance:set(string.format("sdk_auth_%s",secret), cjson.encode(auth_data))
                    end
                else
                    response(10009, "获取数据源信息失败")
                    db:close()
                    return
                end
                data["srcType"] = "sdk"
               
                local producer = require "resty.kafka.producer"
                local broker_list = {
                    { host = "192.168.1.117", port = 9092 }
                }
                local bp = producer:new(broker_list, {producer_type="sync"})
                local ok, err = bp:send("sdk-receive", nil, cjson.encode(data))

                if not ok then
                    response(10000, "系统错误")
                else
                    response(0, "请求成功")
                end
            ';
        }
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }
    }
}

测试

curl -X POST -d '{"type":"user","properties":{"mobile":"18355558888","sex":"男"}}' 'http://localhost:8899/api/v1/api/import?secret=6051d46e-096a-4160-b979-7598202ea23'

相关内容

    暂无相关文章