From b363b5abe4a785a8f217d1cd9082c3c23288cbc8 Mon Sep 17 00:00:00 2001 From: "libo.huang" Date: Thu, 1 Feb 2018 11:23:15 +0800 Subject: [PATCH] try_register --- lib/resty/checkups/api.lua | 35 +++- lib/resty/checkups/consistent_hash.lua | 41 +++-- lib/resty/checkups/round_robin.lua | 20 ++- lib/resty/checkups/try.lua | 217 +++++++++++++++---------- 4 files changed, 202 insertions(+), 111 deletions(-) diff --git a/lib/resty/checkups/api.lua b/lib/resty/checkups/api.lua index 849a6b0..050bf8a 100644 --- a/lib/resty/checkups/api.lua +++ b/lib/resty/checkups/api.lua @@ -170,6 +170,16 @@ function _M.get_ups_timeout(skey) end +function _M.get_ups(skey) + if not skey then + return + end + + local ups = base.upstream.checkups[skey] + return ups +end + + function _M.create_checker() local phase = get_phase() if phase ~= "init_worker" then @@ -233,14 +243,25 @@ local function gen_upstream(skey, upstream) return nil, "cluster invalid" end else - -- only servers local dyupstream, err = dyconfig.do_get_upstream(skey) if err then return nil, err end dyupstream = dyupstream or {} - dyupstream.cluster = upstream + if upstream.servers then + -- store config + for k, v in pairs(upstream) do + if k ~= "servers" then + dyupstream[k] = v + else + dyupstream.cluster = { { servers = v } } + end + end + else + -- only cluster + dyupstream.cluster = upstream + end ups = dyupstream end @@ -322,4 +343,14 @@ function _M.delete_upstream(skey) end +function _M.try_register(name, module) + return try.register(name, module) +end + + +function _M.try_unregister(name) + return try.unregister(name) +end + + return _M diff --git a/lib/resty/checkups/consistent_hash.lua b/lib/resty/checkups/consistent_hash.lua index b828e1f..c060a81 100644 --- a/lib/resty/checkups/consistent_hash.lua +++ b/lib/resty/checkups/consistent_hash.lua @@ -4,6 +4,8 @@ local floor = math.floor local str_byte = string.byte local tab_sort = table.sort local tab_insert = table.insert +local ipairs = ipairs +local type = type local _M = { _VERSION = "0.11" } @@ -22,7 +24,7 @@ local function hash_string(str) end -local function init_consistent_hash_state(servers) +local function init_state(servers) local weight_sum = 0 for _, srv in ipairs(servers) do weight_sum = weight_sum + (srv.weight or 1) @@ -62,37 +64,52 @@ local function binary_search(circle, key) end -function _M.next_consistent_hash_server(servers, peer_cb, hash_key) - local is_tab = require "resty.checkups.base".is_tab - servers.chash = is_tab(servers.chash) and servers.chash - or init_consistent_hash_state(servers) +local function next_server(servers, peer_cb, opts) + servers.chash = type(servers.chash) == "table" and servers.chash + or init_state(servers) local chash = servers.chash if chash.members == 1 then if peer_cb(1, servers[1]) then - return servers[1] + return 1, servers[1] end - return nil, "consistent hash: no servers available" + return nil, nil, nil, "consistent hash: no servers available" end local circle = chash.circle - local st = binary_search(circle, hash_string(hash_key)) + local st = binary_search(circle, hash_string(opts.hash_key)) local size = #circle local ed = st + size - 1 for i = st, ed do -- TODO: algorithm O(n) local idx = circle[(i - 1) % size + 1][2] if peer_cb(idx, servers[idx]) then - return servers[idx] + return idx, servers[idx] end end - return nil, "consistent hash: no servers available" + return nil, nil, nil, "consistent hash: no servers available" end -function _M.free_consitent_hash_server(srv, failed) - return +local function gen_opts(ups, opts) + local key + local mode = ups.mode + if mode == "hash" then + key = opts.hash_key or ngx.var.uri + elseif mode == "url_hash" then + key = ngx.var.uri + elseif mode == "ip_hash" then + key = ngx.var.remote_addr + elseif mode == "header_hash" then + key = ngx.var.http_x_hash_key or ngx.var.uri + end + return { hash_key=key } +end + +function _M.ipairsrvs(servers, peer_cb, ups, opts) + local mopts = gen_opts(ups, opts) + return function() return next_server(servers, peer_cb, mopts) end end diff --git a/lib/resty/checkups/round_robin.lua b/lib/resty/checkups/round_robin.lua index fa3f5ab..70bb213 100644 --- a/lib/resty/checkups/round_robin.lua +++ b/lib/resty/checkups/round_robin.lua @@ -13,19 +13,20 @@ return: - (table) server - (string) error --]] -function _M.next_round_robin_server(servers, peer_cb) +local function next_server(servers, peer_cb) local srvs_cnt = #servers if srvs_cnt == 1 then if peer_cb(1, servers[1]) then - return servers[1], nil + return 1, servers[1], nil end - return nil, "round robin: no servers available" + return nil, nil, nil, "round robin: no servers available" end -- select round robin server local best + local best_idx local max_weight local weight_sum = 0 for idx = 1, srvs_cnt do @@ -46,22 +47,22 @@ function _M.next_round_robin_server(servers, peer_cb) if not max_weight or srv.current_weight > max_weight then max_weight = srv.current_weight best = srv + best_idx = idx end end end if not best then - return nil, "round robin: no servers available" + return nil, nil, nil, "round robin: no servers available" end best.current_weight = best.current_weight - weight_sum - return best, nil + return best_idx, best end - -function _M.free_round_robin_server(srv, failed) +function _M.free_server(srv, failed) if not failed then return end @@ -70,4 +71,9 @@ function _M.free_round_robin_server(srv, failed) end +function _M.ipairsrvs(servers, peer_cb, ups, opts) + return function() return next_server(servers, peer_cb) end +end + + return _M diff --git a/lib/resty/checkups/try.lua b/lib/resty/checkups/try.lua index fedb93b..f908a1b 100644 --- a/lib/resty/checkups/try.lua +++ b/lib/resty/checkups/try.lua @@ -1,18 +1,20 @@ -- Copyright (C) 2014-2016, UPYUN Inc. -local cjson = require "cjson.safe" local round_robin = require "resty.checkups.round_robin" local consistent_hash = require "resty.checkups.consistent_hash" local base = require "resty.checkups.base" -local max = math.max -local sqrt = math.sqrt -local floor = math.floor -local tab_insert = table.insert +local type = type +local ipairs = ipairs local tostring = tostring +local tab_insert = table.insert +local str_sub = string.sub +local str_find = string.find +local str_format = string.format -local update_time = ngx.update_time -local now = ngx.now +local now = ngx.now +local log = ngx.log +local ERR = ngx.ERR local _M = { _VERSION = "0.11" } @@ -21,11 +23,70 @@ local is_tab = base.is_tab local NEED_RETRY = 0 local REQUEST_SUCCESS = 1 local EXCESS_TRY_LIMIT = 2 +local RETRY_DONE = 3 +_M.NEED_RETRY = NEED_RETRY +_M.RETRY_DONE = RETRY_DONE +_M.REQUEST_SUCCESS = REQUEST_SUCCESS +_M.EXCESS_TRY_LIMIT = EXCESS_TRY_LIMIT + +local reg = { + hash = consistent_hash, + default = round_robin, +} + +function _M.register(name, module) + if type(name) ~= "string" or str_find(name, "_") then + return false, "invalid name" + end + + if type(module) ~= "table" then + return false, "invalid module" + end + + if not module.ipairsrvs and not module.itercls then + return false, "invalid module" + end + + reg[name] = module + return true +end + + +function _M.unregister(name) + reg[name] = nil + return true +end + + +local function default_retry(ups, try_limit) + local statuses + if ups.typ == "http" and is_tab(ups.http_opts) then + statuses = ups.http_opts.statuses + end + local try_cnt = 0 + local retry_cb = function(res) + if is_tab(res) and res.status and is_tab(statuses) then + if statuses[tostring(res.status)] ~= false then + return REQUEST_SUCCESS + end + elseif res then + return REQUEST_SUCCESS + end + + try_cnt = try_cnt + 1 + if try_cnt >= try_limit then + return EXCESS_TRY_LIMIT + end + + return NEED_RETRY + end + return retry_cb +end -local function prepare_callbacks(skey, opts) - local ups = base.upstream.checkups[skey] + +local function prepare_callbacks(skey, ups, opts, module) -- calculate count of cluster and server local cls_keys = {} -- string key or number level local srvs_cnt = 0 @@ -44,7 +105,6 @@ local function prepare_callbacks(skey, opts) end end - -- get next level cluster local cls_key local cls_index = 0 @@ -59,28 +119,14 @@ local function prepare_callbacks(skey, opts) return ups.cluster[cls_key] end - - -- get next select server - local mode = ups.mode - local next_server_func = round_robin.next_round_robin_server - local key - if mode ~= nil then - if mode == "hash" then - key = opts.hash_key or ngx.var.uri - elseif mode == "url_hash" then - key = ngx.var.uri - elseif mode == "ip_hash" then - key = ngx.var.remote_addr - elseif mode == "header_hash" then - key = ngx.var.http_x_hash_key or ngx.var.uri - end - - next_server_func = consistent_hash.next_consistent_hash_server - end - local next_server_cb = function(servers, peer_cb) - return next_server_func(servers, peer_cb, key) + local try_limit + if module.try_limit then + try_limit = module.try_limit(ups, opts) + else + try_limit = opts.try or ups.try end + try_limit = try_limit or srvs_cnt -- check whether ther server is available local bad_servers = {} @@ -102,32 +148,8 @@ local function prepare_callbacks(skey, opts) end end - - -- check whether need retry - local statuses - if ups.typ == "http" and is_tab(ups.http_opts) then - statuses = ups.http_opts.statuses - end - local try_cnt = 0 - local try_limit = opts.try or ups.try or srvs_cnt - local retry_cb = function(res) - if is_tab(res) and res.status and is_tab(statuses) then - if statuses[tostring(res.status)] ~= false then - return REQUEST_SUCCESS - end - elseif res then - return REQUEST_SUCCESS - end - - try_cnt = try_cnt + 1 - if try_cnt >= try_limit then - return EXCESS_TRY_LIMIT - end - - return NEED_RETRY - end - - + local retry_func = module.retry_cb or default_retry + local retry_cb = retry_func(ups, try_limit) -- check whether try_time has over amount_request_time local try_time = 0 local try_time_limit = opts.try_timeout or ups.try_timeout or 0 @@ -142,23 +164,18 @@ local function prepare_callbacks(skey, opts) return NEED_RETRY end - -- set some status - local free_server_func = round_robin.free_round_robin_server - if mode == "hash" then - free_server_func = consistent_hash.free_consitent_hash_server - end local set_status_cb = function(srv, failed) local key = ("%s:%s:%s"):format(cls_key, srv.host, srv.port) bad_servers[key] = failed base.set_srv_status(skey, srv, failed) - free_server_func(srv, failed) + if module.free_server then + module.free_server(srv, failed) + end end - return { next_cluster_cb = next_cluster_cb, - next_server_cb = next_server_cb, retry_cb = retry_cb, peer_cb = peer_cb, set_status_cb = set_status_cb, @@ -181,20 +198,57 @@ return: - (string) error --]] function _M.try_cluster(skey, request_cb, opts) - local callbacks = prepare_callbacks(skey, opts) + local ups = base.upstream.checkups[skey] + local mode = ups.mode or "default" + local _, to = str_find(mode, "_") + if to then + mode = str_sub(mode, to + 1) + end + local module = reg[mode] + print("mode:", mode, ", module:", tostring(module)) + local callbacks = prepare_callbacks(skey, ups, opts, module) local next_cluster_cb = callbacks.next_cluster_cb - local next_server_cb = callbacks.next_server_cb local peer_cb = callbacks.peer_cb local retry_cb = callbacks.retry_cb local set_status_cb = callbacks.set_status_cb local try_time_cb = callbacks.try_time_cb - -- iter servers function - local itersrvs = function(servers, peer_cb) - return function() return next_server_cb(servers, peer_cb) end + local request_feedback = function(start_time, srv, res, err) + -- check whether need retry + local end_time = now() + local delta_time = end_time - start_time + + local feedback = retry_cb(res) + set_status_cb(srv, feedback ~= REQUEST_SUCCESS) -- set some status + if feedback ~= NEED_RETRY then + return RETRY_DONE, res, err + end + + local feedback_try_time = try_time_cb(delta_time) + if feedback_try_time ~= NEED_RETRY then + return RETRY_DONE, nil, "try_timeout excceed" + end + return NEED_RETRY end + local default_itercls = function(cls, peer_cb, request_cb, request_feedback) + for _, srv, opts, err in module.ipairsrvs(cls.servers, peer_cb, ups, opts) do + if err then + log(ERR, str_format("iter err: %s", err)) + return nil, err + end + local start_time = now() + local res, err = request_cb(srv.host, srv.port, opts) + local retry, res, err = request_feedback(start_time, srv, res, err) + if retry == RETRY_DONE then + return RETRY_DONE, res, err + end + end + end + + local itercls = module.itercls or default_itercls + local res, err = nil, "no servers available" repeat -- get next level/key cluster @@ -202,27 +256,10 @@ function _M.try_cluster(skey, request_cb, opts) if not cls then break end - - for srv, err in itersrvs(cls.servers, peer_cb) do - -- exec request callback by server - local start_time = now() - res, err = request_cb(srv.host, srv.port) - - -- check whether need retry - local end_time = now() - local delta_time = end_time - start_time - - local feedback = retry_cb(res) - set_status_cb(srv, feedback ~= REQUEST_SUCCESS) -- set some status - if feedback ~= NEED_RETRY then - return res, err - end - - local feedback_try_time = try_time_cb(delta_time) - if feedback_try_time ~= NEED_RETRY then - return nil, "try_timeout excceed" - end - end + local retry, res, err = itercls(cls, peer_cb, request_cb, request_feedback) + if retry == RETRY_DONE then + return res, err + end until false return res, err