Openresty学习(六):模块开发:通过Upstream机制访问第三方协议服务器,openrestyupstream


Upstream机制是nginx中的核心模块之一,基于Upstream模块可以对上游的各种服务器或组件进行访问,如web服务器、memcache、反向代理、数据库和redis访问。正是Upstream模块的支持,使nginx可以异步访问上述组件,保证Nginx的性能。

本文实现nginx模块(upstream_oth):基于Upstream访问第三方TCP自定义协议服务器,解析第三方服务器的响应包头,并将包体返回 给http客户端。

 

运行逻辑介绍:

1. 通过curl 访问upstream_oth模块,nginx http://127.0.0.1:80/up_oth_test

2. upstream_oth模块通过upstream机制访问TCP 服务器

3. TCP服务器返回自定义数据(含包头)

4. upstream_oth模块解析包头获取包体长度和编码格式,返回 包体给curl客户端

Nginx配置:

http {
        upstream up_oth_backend {
            server localhost:10000;
        }

      server  {

        http {
           location /up_oth_test {
           #internal;
           uptream_oth_pass up_oth_backend;
           }          
        }
    }

}

 

测试步骤:

1. 启动nginx

2. 启动tcp_server

3. curl访问nginx

 

测试结果:

root@iZ2ze3uk97m2bfv681kyx4Z:~# curl http://127.0.0.1:80/up_oth_test
{"response": "good job"}

 

其它说明:

1.可以修改为将客户端消息体发送给TCP服务器来完成不同的功能

2. 自定义协议的头部和包体可以根据需要修改

3.文中upstream配置参数为硬编码,可根据需要修改为配置

 

 

第三方服务器响应消息头:

 

tcp_server.c:

#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <sys/time.h>  
#include <sys/types.h>  
#include <sys/socket.h>  
#include <netinet/in.h>  
#include <sys/select.h>  
#include <errno.h>  
  
  
#define SERVER_IP "127.0.0.1"  
#define SERVER_PORT 10000
  
  
#define MAX_RECV_LEN 1024  
#define MAX_CLIENT_NUM 30  
#define BACK_LOG 20  
 
typedef struct {
    int type;
    int length;
} msg_header_t; 
  
static int running = 1;  
  
  
int main(int argc, char *argv[])  
{  
    int sock_fd = -1;  
    int ret = -1;  
    struct sockaddr_in serv_addr;  
    struct sockaddr_in cli_addr;  
    socklen_t serv_addr_len = 0;  
    socklen_t cli_addr_len = 0;  
    int client_fd[MAX_CLIENT_NUM];  
    char recv_buf[MAX_RECV_LEN];  
    int new_conn_fd = -1;  
    int i = 0;  
    int max_fd = -1;  
    int num = -1;  
    struct timeval timeout;  
    char response_buf[1024];
    //char response_msg[] = {"good job"};
    char response_msg[] = {"{\"response\": \"good job\"}"};
  
    fd_set read_set;  
    fd_set write_set;  
    fd_set select_read_set;  
  
  
    FD_ZERO(&read_set);  
    FD_ZERO(&write_set);  
    FD_ZERO(&select_read_set);  
  
    for (i = 0; i < MAX_CLIENT_NUM; i++)  
    {  
        client_fd[i] = -1;  
    }   
  
    memset(&serv_addr, 0, sizeof(serv_addr));  
    memset(&cli_addr, 0, sizeof(cli_addr));  
  
  
    sock_fd = socket(AF_INET, SOCK_STREAM, 0);  
    if (sock_fd < 0)  
    {  
        perror("Fail to socket");  
        exit(1);  
    }  
  
  
    serv_addr.sin_family = AF_INET;  
    serv_addr.sin_port = htons(SERVER_PORT);  
    serv_addr.sin_addr.s_addr = inet_addr(SERVER_IP);  
  
  
    unsigned int value = 1;  
    if (setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR,  
                (void *)&value, sizeof(value)) < 0)  
    {  
        perror("Fail to setsockopt");  
        exit(1);  
    }  
  
  
    serv_addr_len = sizeof(serv_addr);  
    if (bind(sock_fd, (struct sockaddr*)&serv_addr, serv_addr_len) < 0)  
    {  
        perror("Fail to bind");  
        exit(1);  
    }  
    if (listen(sock_fd, BACK_LOG) < 0)  
    {  
        perror("Fail to listen");  
        exit(1);  
    }  
 
    msg_header_t *msg_hdr = NULL;
    char buf[1024];  
    max_fd = sock_fd;  
    int len;  
    FD_SET(sock_fd, &read_set);  
    while (running)  
    {  
        timeout.tv_sec = 5;  
        timeout.tv_usec = 0;  
  
  
        max_fd = sock_fd;  
        for (i = 0; i < MAX_CLIENT_NUM; i++)  
        {  
            if (max_fd < client_fd[i])  
            {  
                max_fd = client_fd[i];  
            }  
        }  
  
  
        select_read_set = read_set;  
        ret = select(max_fd + 1, &select_read_set, NULL, NULL, &timeout);  
        if (ret == 0)  
        {  
            //printf("timeout\n");  
        }  
        else if (ret < 0)  
        {  
            printf("error occur\n");  
        }  
        else  
        {  
            if (FD_ISSET(sock_fd, &select_read_set))  
            {  
                printf("new client comes\n");  
                len = sizeof(cli_addr);  
                new_conn_fd = accept(sock_fd, (struct sockaddr*)&cli_addr, &len);  
                if (new_conn_fd < 0)  
                {  
                    perror("Fail to accept");  
                    exit(1);  
                }  
                else  
                {  
                    for (i = 0; i < MAX_CLIENT_NUM; i++)  
                    {  
                        if (client_fd[i] == -1)  
                        {  
                            client_fd[i] = new_conn_fd;  
                            FD_SET(new_conn_fd, &read_set);  
                            break;  
                        }  
                        if (max_fd < new_conn_fd)  
                        {  
                            max_fd = new_conn_fd;  
                        }  
                    }  
                }  
            }  
            else  
            {  
                for (i = 0; i < MAX_CLIENT_NUM; i++)  
                {  
                    if (-1 == client_fd[i]) {  
                        continue;  
                    }  
                    memset(recv_buf, 0, MAX_RECV_LEN);  
                    if (FD_ISSET(client_fd[i], &select_read_set))  
                    {  
                        num = read(client_fd[i], recv_buf, MAX_RECV_LEN);  
                        if (num < 0)  
                        {  
                            printf("Client(%d) left\n", client_fd[i]);  
                            FD_CLR(client_fd[i], &read_set);  
                            close(client_fd[i]);  
                            client_fd[i] = -1;  
                        }  
                        else if (num > 0)  
                        {  
                            recv_buf[num] = '\0';  
                            printf("Recieve client(%d) data\n", client_fd[i]);  
                            printf("Data: %s\n\n", recv_buf);
                            msg_hdr = (msg_header_t *)response_buf;
                            msg_hdr->type = 1;
                            msg_hdr->length = strlen(response_msg);
                            snprintf(response_buf + sizeof(msg_header_t), sizeof(response_buf) - sizeof(msg_header_t),
                                    "%s", response_msg);  
                            write(client_fd[i], response_buf, strlen(response_msg) + sizeof(msg_header_t));
                        } if (num == 0)  
                        {  
                            printf("Client(%d) exit\n", client_fd[i]);  
                            FD_CLR(client_fd[i], &read_set);  
                            close(client_fd[i]);  
                            client_fd[i] = -1;  
                        }  
                    }  
                }  
            }  
        }  
    }  
    return 0;  
}

 

tcp server编译: gcc -o tcp_server tcp_server.c

 

config:

ngx_addon_name=ngx_http_upstream_oth_module
HTTP_MODULES="$HTTP_MODULES ngx_http_upstream_oth_module"
NGX_ADDON_SRCS="$NGX_ADDON_SRCS $ngx_addon_dir/ngx_http_upstream_oth_module.c"

 

ngx_http_upstream_oth_module.c:

 

#include <ngx_config.h>  
#include <ngx_core.h>  
#include <ngx_http.h>  
  
typedef struct
{  
    ngx_http_upstream_conf_t upstream;
    ngx_int_t                  index;
    ngx_uint_t                 gzip_flag;
} ngx_http_upstream_oth_loc_conf_t;

typedef struct {
    size_t                     rest;
    ngx_http_request_t        *request;
} ngx_http_up_other_ctx_t;

typedef struct {
    int type;
    int length;
} up_other_msg_header_t;

static ngx_int_t ngx_http_upstream_oth_handler(ngx_http_request_t* r);
  
static void* ngx_http_upstream_oth_create_loc_conf(ngx_conf_t* cf);  
  
static char* ngx_http_upstream_oth_merge_loc_conf(ngx_conf_t* cf, void* parent, void* child);  

static char *
ngx_http_up_other_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);

static ngx_command_t ngx_http_upstream_oth_commands[] =   
{
   {  
    ngx_string("uptream_oth_pass"),   
        NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,  
        ngx_http_up_other_pass,   
        NGX_HTTP_LOC_CONF_OFFSET,  
        0,  
        NULL  
    },
    ngx_null_command  
};

static ngx_http_module_t ngx_http_upstream_oth_module_ctx =
{
    NULL,
    NULL,
    NULL, 
    NULL,
    NULL,
    NULL,
    ngx_http_upstream_oth_create_loc_conf,
    ngx_http_upstream_oth_merge_loc_conf
};  
  
ngx_module_t ngx_http_upstream_oth_module = {
    NGX_MODULE_V1,
    &ngx_http_upstream_oth_module_ctx,
    ngx_http_upstream_oth_commands,
    NGX_HTTP_MODULE,
    NULL,
    NULL,
    NULL,
    NULL,
    NULL,
    NULL,
    NULL,
    NGX_MODULE_V1_PADDING
};

static ngx_int_t
ngx_http_up_other_create_request(ngx_http_request_t *r)
{
    ngx_buf_t                      *b;
    ngx_chain_t                    *cl;

    cl = ngx_alloc_chain_link(r->pool);
    if (cl == NULL) {
        return NGX_ERROR;
    }

    b = ngx_pcalloc(r->pool, sizeof(ngx_buf_t));  

    cl->buf = b;
    cl->next = NULL;

    b->pos = (u_char*)"Hello World";
    b->last = b->pos + sizeof("Hello World") - 1;
    b->memory = 1;

    r->upstream->request_bufs = cl;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "http up other request",);

    return NGX_OK;
}

static void
ngx_http_up_other_abort_request(ngx_http_request_t *r)
{
    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "abort http memcached request");
    return;
}

static void
ngx_http_up_other_finalize_request(ngx_http_request_t *r, ngx_int_t rc)
{
    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "finalize http up other request");
    return;
}

static ngx_int_t
ngx_http_up_other_process_header(ngx_http_request_t *r)
{
    ngx_http_upstream_t            *u = NULL;
    up_other_msg_header_t *msg_hdr = NULL;
    ngx_buf_t               *b = NULL;

    u = r->upstream;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "process header for up other");

    b = &u->buffer;

    if (ngx_buf_size(b) < sizeof(up_other_msg_header_t))
    {
        return NGX_AGAIN;
    }

    msg_hdr = (up_other_msg_header_t *)u->buffer.start;
    if (1 == msg_hdr->type)
    {
        r->headers_out.content_type.len = sizeof("application/json; charset=utf-8") - 1;
        r->headers_out.content_type.data = (u_char*)"application/json; charset=utf-8";
    }

    u->headers_in.content_length_n = msg_hdr->length;

    u->buffer.pos = u->buffer.pos + sizeof(up_other_msg_header_t);

    u->headers_in.status_n = 200;
    u->state->status = 200;

    return NGX_OK;
}

static ngx_int_t
ngx_http_up_other_reinit_request(ngx_http_request_t *r)
{
    return NGX_OK;
}

static char *
ngx_http_up_other_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    ngx_http_upstream_oth_loc_conf_t *mlcf = conf;

    ngx_str_t                 *value;
    ngx_url_t                  u;
    ngx_http_core_loc_conf_t  *clcf;

    if (mlcf->upstream.upstream) {
        return "is duplicate";
    }

    value = cf->args->elts;

    ngx_memzero(&u, sizeof(ngx_url_t));

    u.url = value[1];
    u.no_resolve = 1;

    mlcf->upstream.upstream = ngx_http_upstream_add(cf, &u, 0);
    if (mlcf->upstream.upstream == NULL) {
        return NGX_CONF_ERROR;
    }

    clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);

    clcf->handler = ngx_http_upstream_oth_handler;

    if (clcf->name.data[clcf->name.len - 1] == '/') {
        clcf->auto_redirect = 1;
    }

    return NGX_CONF_OK;
}

static ngx_int_t
ngx_http_up_other_filter_init(void *data)
{
    ngx_http_up_other_ctx_t  *ctx = data;

    ngx_http_upstream_t  *u;

    u = ctx->request->upstream;

    if (u->headers_in.status_n != 404) {
        u->length = u->headers_in.content_length_n;
        ctx->rest = -1;
    } else {
        u->length = 0;
    }

    return NGX_OK;
}

static ngx_int_t
ngx_http_up_other_filter(void *data, ssize_t bytes)
{
    ngx_http_up_other_ctx_t  *ctx = data;

    u_char               *last;
    ngx_buf_t            *b;
    ngx_chain_t          *cl, **ll;
    ngx_http_upstream_t  *u;

    u = ctx->request->upstream;
    b = &u->buffer;

    ngx_log_error(NGX_LOG_EMERG, ctx->request->connection->log, 0,
        "%s: %O, rest: %z, bytes: %d", __FUNCTION__,
        u->length, ctx->rest, bytes);

    if (u->length == (ssize_t) ctx->rest) {
        u->length -= bytes;
        ctx->rest -= bytes;

        if (u->length == 0) {
            u->keepalive = 1;
        }

        ngx_log_error(NGX_LOG_EMERG, ctx->request->connection->log, 0,
            "%s:return ok %O, rest: %z, bytes: %d", __FUNCTION__,
            u->length, ctx->rest, bytes);

        return NGX_OK;
    }

    for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
        ll = &cl->next;
    }

    cl = ngx_chain_get_free_buf(ctx->request->pool, &u->free_bufs);
    if (cl == NULL) {
        return NGX_ERROR;
    }

    cl->buf->flush = 1;
    cl->buf->memory = 1;

    *ll = cl;

    last = b->last;
    cl->buf->pos = b->last;
    b->last += bytes;
    cl->buf->last = b->last;
    cl->buf->tag = u->output.tag;

    ngx_log_debug4(NGX_LOG_DEBUG_HTTP, ctx->request->connection->log, 0,
                   "memcached filter bytes:%z size:%z length:%O rest:%z",
                   bytes, b->last - b->pos, u->length, ctx->rest);


    ctx->rest -= bytes;
    u->length = ctx->rest;

    if (u->length == 0) {
        u->keepalive = 1;
    }

    return NGX_OK;
}

static ngx_int_t ngx_http_upstream_oth_handler(ngx_http_request_t* r)
{  
    ngx_int_t rc;  
    ngx_http_upstream_t            *u;
    ngx_http_upstream_oth_loc_conf_t* conf = NULL;  
    ngx_http_up_other_ctx_t *ctx = NULL;

    rc = ngx_http_discard_request_body(r);

    if (rc != NGX_OK) {
        return rc;
    }

    rc = ngx_http_upstream_create(r);
    if (NGX_OK != rc) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    u = r->upstream;   
    ngx_str_set(&u->schema, "upstream_other://");
    u->output.tag = (ngx_buf_tag_t) &ngx_http_upstream_oth_module;

    conf = ngx_http_get_module_loc_conf(r, ngx_http_upstream_oth_module);
    if (NULL == conf)
    {
        ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                          "no conf",);
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    u->conf = &conf->upstream;

    u->create_request = ngx_http_up_other_create_request;
    u->reinit_request = ngx_http_up_other_reinit_request;
    u->process_header = ngx_http_up_other_process_header;
    u->abort_request = ngx_http_up_other_abort_request;
    u->finalize_request = ngx_http_up_other_finalize_request;

    ctx = ngx_palloc(r->pool, sizeof(ngx_http_up_other_ctx_t));
    if (ctx == NULL) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    ctx->request = r;

    ngx_http_set_ctx(r, ctx, ngx_http_upstream_oth_module);

    u->input_filter_init = ngx_http_up_other_filter_init;
    u->input_filter = ngx_http_up_other_filter;
    u->input_filter_ctx = ctx;

    r->main->count++;

    ngx_http_upstream_init(r);

    return NGX_DONE;
}

static void* ngx_http_upstream_oth_create_loc_conf(ngx_conf_t* cf) {  
    ngx_http_upstream_oth_loc_conf_t* conf;

    conf = ngx_pcalloc(cf->pool, sizeof(ngx_http_upstream_oth_loc_conf_t));  
    if (conf == NULL) {
        return NGX_CONF_ERROR;
    }

    conf->upstream.connect_timeout = 60000;
    conf->upstream.send_timeout = 60000;
    conf->upstream.read_timeout = 60000;
    conf->upstream.store_access = 0600;

    conf->upstream.buffering = 0;
    conf->upstream.bufs.num = 8;
    conf->upstream.bufs.size = ngx_pagesize;
    conf->upstream.buffer_size = ngx_pagesize;
    conf->upstream.busy_buffers_size = 2 * ngx_pagesize;
    conf->upstream.temp_file_write_size = 2 * ngx_pagesize;
    conf->upstream.max_temp_file_size = 1024 * 1024 * 1024;

    conf->upstream.hide_headers = NGX_CONF_UNSET_PTR;
    conf->upstream.pass_headers = NGX_CONF_UNSET_PTR;

    return conf;  
}

static char* ngx_http_upstream_oth_merge_loc_conf(ngx_conf_t* cf, void* parent, void* child)   
{
    //ngx_http_upstream_oth_loc_conf_t* prev = parent;
    //ngx_http_upstream_oth_loc_conf_t* conf = child;
    return NGX_CONF_OK;
}

 

相关内容

    暂无相关文章