openresty 服务发现,


结合consul 做服务发现和负载均衡(此处用consul的service tag来作为path)
准备环境

准备lua script

创建lua/data.lua,内容

 local _M = {}

 local data = {
 }

 function _M.get(name)
     return data[name]
 end

 function _M.set(new_data)
  for k,v in pairs(new_data) do
    data[k] = v
  end
 end

 return _M

创建lua/balance.lua,内容

local data = require("lua/data")
local balancer = require("ngx.balancer")

local math = require("math")
srvs=data.get(ngx.var.request_uri)
len = table.getn(srvs)
if len ==0 then
  ngx.exit(500)
  return
end
idx=math.random(table.getn(srvs))
cur = srvs[idx]
where = string.find(cur, ':')
host = string.sub(cur, 1, where - 1)
port = tonumber(string.sub(cur,where+1))
balancer.set_current_peer(host,port)

创建lua/worker.lua,内容

local data = require("lua/data")
local http = require("resty.http")
local cjson = require("cjson")
cjson.decode_array_with_array_mt(true)

local delay = 2
local new_timer = ngx.timer.at

function exist(t, key)
  for k,v  in pairs(t) do
    if k == key then
      return true
    end
   end
  return false
end

local get_service = function(name)
    local httpc = http.new()
    local res, err = httpc:request_uri("http://127.0.0.1:8500/v1/catalog/service/"..name, {
      method = "GET",
    })
    if not res then
      ngx.say("failed to request: ", err)
      return
    end
    local cjson2 = cjson.new()

    local bodyObj = cjson.decode(res.body)
    all = {}
    for k,v in ipairs(bodyObj) do
    if v == nil then
        end
    if  v.ServiceTags then
     if  v.ServiceTags[1] then
         local path = v.ServiceTags[1]
          if exist(all,path) ==false then
            all[path]={}
          end
          table.insert(all[path],v.ServiceAddress..":"..v.ServicePort)

      end
    end
   end

  data.set(all)
end

local get_services = function()
    local httpc = http.new()
    local res, err = httpc:request_uri("http://127.0.0.1:8500/v1/catalog/services", {
      method = "GET",
    })
    if not res then
      return
    end
    local cjson2 = cjson.new()

    local bodyObj = cjson.decode(res.body)
    local rt = cjson.encode(bodyObj)
    for k,v in pairs(bodyObj) do
    get_service(k)
    end
    return
end



pull = function(premature)
    if  premature then
        return
    end
    local err = get_services()
    if err then
    return
    end

    local ok, err = new_timer(delay, pull)
    if not ok then
        ngx.log(ERR, "failed to create timer: ", err)
        return
    end
end

local ok, err = new_timer(delay,pull)
if not ok then
    ng.log(ngx.ERR, "failed to create timer: ", err)
end

准备服务

准备test1.go,内容

package main

import (
    "flag"
    "fmt"
    "io"
    "net/http"
    "time"

    "github.com/hashicorp/consul/api"
)

const (
    name = "test"
)

var (
    port = flag.Int("port", 0, "-port=5003")
    tags = []string{"/"}
)

func main() {

    flag.Parse()
    if *port == 0 {
        flag.Usage()
        return
    }

    consulCfg := api.DefaultConfig()
    client, err := api.NewClient(consulCfg)
    if err != nil {
        panic(err)
    }
    nodeName, err := client.Agent().NodeName()
    if err != nil {
        panic(err)
    }
    node, _, err := client.Catalog().Node(nodeName, nil)
    if err != nil {
        panic(err)
    }
    if node == nil {
        panic("found no node")
    }
    lanAddr := node.Node.TaggedAddresses["lan"]
    wanAddr := node.Node.TaggedAddresses["wan"]
    if lanAddr == "" || wanAddr == "" {
        panic("node addr empty")
    }

    address := node.Node.Address
    endpoint := fmt.Sprintf("http://%s:%d/health", node.Node.Address, port)
    checkCfg := &api.AgentServiceCheck{
        Interval: "5s",
        HTTP:     endpoint,
        Timeout:  "10s",
        DeregisterCriticalServiceAfter: "1m",
    }
    _ = checkCfg
    srvId := fmt.Sprintf("%s-%d", name, time.Now().Unix())
    err = client.Agent().ServiceRegister(&api.AgentServiceRegistration{
        ID:      srvId,
        Name:    name,
        Address: address,
        Port:    *port,
        Check:   checkCfg,
        Tags:    tags,
    })
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        io.WriteString(w, fmt.Sprintf("i'm on %d", *port))
    })
    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {

    })
    http.ListenAndServe(fmt.Sprintf(":%d", *port), nil)

}

下载lua依赖

cd ~/work
git clone https://github.com/ledgetech/lua-resty-http.git

修改nginx 配置

worker_processes  1;
error_log logs/error.log;
events {
    worker_connections 1024;
}
http {
upstream backend {
        server 192.168.0.2:8080; //此处必须有一个server(否则提示配置错误)可以任意填写
        balancer_by_lua_file lua/balance.lua;
}
init_worker_by_lua_file lua/worker.lua
lua_package_path "/home/vagrant/work/lua-resty-http/lib/?.lua;;";
    server {
        listen 8080;
        location / {

proxy_pass http://backend;
        }
    }
}

下载consul程序

运行consul

consul agent -dev

运行nginx

cd ~/work
nginx -p `pwd`/ -c conf/nginx.conf

运行服务

go build -o test test.go 
./test -port 5003
./test -port 5004

验证

curl http://127.0.0.1:8080/
可以获得i'm on 5003或者i'm on 5004的响应

相关内容

    暂无相关文章