6
\$\begingroup\$

I'm working on a project to clone a subset of requests going to our production servers to one or more non-production endpoints. I opted to do this in Nginx and Lua because at the time, I could not find a tool that fit the requirements I needed, namely, only forwarding all requests for a subset of session IDs (as opposed to a percentage of all requests).

Initially, I was simply using ngx.timer.at to dispatch the subrequest, but I was informed that the server could run out of threads if the destination server was unavailable. So I looked at lua-resty-logger-socket and used the code that they use to buffer requests and modified it for my usage.

This is my first Lua project in Nginx, and actually my first time touching Lua years, and nobody at work knows Lua well enough to give any advice, so I'd appreciate if anyone could point out any flaws in my methods or things that can be done better.

example nginx.conf:

location = /GenerateToken {
 set $session_token "";
 access_by_lua_file store_token_from_uri.lua;
 body_filter_by_lua_file store_token_from_body.lua;
 log_by_lua_block {
 local dispatcher = require "dispatcher"
 if not dispatcher.initialized() then
 local ok, err = dispatcher.init {
 servers = {
 ["127.0.0.1:" .. ngx.var.server_port] = .99
 }
 }
 if not ok then
 ngx.log(ngx.ERR, "failed to initialize the logger: ",
 err)
 return
 end
 end
 dispatcher.queue_request(ngx.var.session_token)
 }
}

store_token_from_uri.lua

ngx.req.read_body()
-- Extract the token from the URI if it is present
-- add "set $session_token "";" before to initialize variable
ngx.ctx.args = ngx.req.get_uri_args()
-- If the id argument is present, in the query string, that is our token.
if ngx.ctx.args.token then
ngx.var.session_token = ngx.ctx.args.token
elseif ngx.ctx.args.id then
 ngx.var.session_token = ngx.ctx.args.id
end

store_token_from_body.lua

-- Stores token from the response body in an nginx variable
-- Runs in body_filter_by_lua_file
-- For GenerateToken requests, the token must be extracted from the response.
if string.find(ngx.var.uri, '/GenerateToken') then
 -- Build the response body from the buffer
 local resp_body = string.sub(ngx.arg[1], 1, 8000)
 ngx.ctx.buffered = (ngx.ctx.buffered or "") .. resp_body
 if ngx.arg[2] then
 -- For GET requests with a callback, extract the token from the callback response
 if string.find(ngx.var.uri, '/GenerateToken.js') and ngx.ctx.args.callback then
 _, _, ngx.var.session_token = string.find(ngx.ctx.buffered, "(%x+-%x+-%x+-%x+%-%x+)")
 -- Otherwise, the response should just contain the token.
 else
 -- Trim the response of any trailing newlines
 ngx.var.session_token = ngx.ctx.buffered:gsub("^%s*(.-)%s*$", "%1")
 end
 -- Add the token to the query string so the remote server will not re-create a new token
 ngx.ctx.args.token = ngx.var.session_token
 ngx.req.set_uri_args(ngx.ctx.args)
 end
end

dispatcher.lua

-- Add to log_by_lua_file
local http = require "resty.http"
local insert = table.insert
local timer_at = ngx.timer.at
local ngx_log = ngx.log
local type = type
local pairs = pairs
local ipairs = ipairs
local debug = ngx.config.debug
local DEBUG = ngx.DEBUG
-- table.new(narr, nrec)
local succ, new_tab = pcall(require, "table.new")
if not succ then
 new_tab = function () return {} end
end
local _M = new_tab(0, 4)
-- user config
local servers = {}
-- 1MB
local flush_limit = 1048576
-- drop at 50MB
local drop_limit = 52428800
-- reuse buffer for at most 10000 times
local max_buffer_reuse = 10000
local periodic_flush
local need_periodic_flush
local flushing
local dispatcher_initialized
local counter = 0
-- internal variables
local buffer_size = 0
-- 1st level buffer, it stores incoming requests
local incoming_buffer = new_tab(20000, 0)
-- number of requests in current 1st level buffer, starts from 0
local incoming_buffer_index = 0
-- 2nd level buffer, it stores requests ready to be sent out
local send_buffer = new_tab(1000, 0)
local send_buffer_index = 0
local send_buffer_size = 0
---
-- Populates the second-level buffer
local function _prepare_send_buffer()
 for i=1, incoming_buffer_index do
 send_buffer_index = send_buffer_index + 1
 send_buffer[i] = incoming_buffer[i]
 end
 send_buffer_size = buffer_size
 incoming_buffer_index = 0
 counter = counter + 1
 if counter > max_buffer_reuse then
 incoming_buffer = new_tab(20000, 0)
 send_buffer = new_tab(1000, 0)
 counter = 0
 if debug then
 ngx_log(DEBUG, "request buffer reuse limit (" .. max_buffer_reuse
 .. ") reached, create a new \"request_buffer_data\"")
 end
 end
end
-- this is expensive and should only be used to tidy up in case of an error
local function _pop_send_buffer(count)
 for i=1, count do
 local packet = send_buffer.remove(i)
 send_buffer_index = send_buffer_index - 1
 send_buffer_size = send_buffer_size - #packet
 end
end
---
-- Performs the actual buffered subrequests
local function _do_flush()
 local ok, err
 local errors = ""
 local all_ok = true
 for i ,request in ipairs(send_buffer) do
 local httpc = http.new()
 httpc:set_timeout(0)
 httpc:connect(request[1], request[2] or 80)
 request[5].host = request[1]
 -- Use send_request instead of request to prevent needing to read the response back.
 ok, err = httpc:send_request{
 method = request[4],
 path = request[3],
 headers = request[5],
 body = request[6]
 }
 if not ok then
 -- ensure we don't resend packets later that we've already sent
 _pop_send_buffer(i - 1)
 all_ok = false
 end
 if err then
 errors = errors .. " ".. err
 end
 end
 return all_ok, errors
end
---
-- Determines if a flush is required
local function _need_flush()
 if incoming_buffer_index > 0 or send_buffer_index > 0 then
 return true
 end
 return false
end
---
-- Locks the buffer flush so only one flush can be happening at once
local function _flush_lock()
 if not flushing then
 if debug then
 ngx_log(DEBUG, "flush lock acquired")
 end
 flushing = true
 return true
 end
 return false
end
---
-- Unlocks the buffer flush once finished
local function _flush_unlock()
 if debug then
 ngx_log(DEBUG, "flush lock released")
 end
 flushing = false
end
---
-- Performs the buffer flush if we can
local function _flush()
 local ok, err
 -- pre check
 if not _flush_lock() then
 if debug then
 ngx_log(DEBUG, "previous flush not finished")
 end
 -- do this later
 return true
 end
 if not _need_flush() then
 if debug then
 ngx_log(DEBUG, "no need to flush:", incoming_buffer_index)
 end
 _flush_unlock()
 return true
 end
 -- start flushing
 if debug then
 ngx_log(DEBUG, "start flushing")
 end
 local bytes
 if incoming_buffer_index > 0 then
 _prepare_send_buffer()
 end
 ok, err = _do_flush()
 _flush_unlock()
 if not ok then
 local err_msg = "buffer flush failed: " .. err
 return nil, err_msg
 else
 if debug then
 ngx_log(DEBUG, "request sent")
 end
 end
 buffer_size = buffer_size - send_buffer_size
 return ok
end
---
-- If periodic flush is enabled, determines if a flush is needed. A periodic flush
-- will only happen if no flush had occured during the waiting period between periodic
-- flushes
local function _periodic_flush()
 if need_periodic_flush then
 -- no regular flush happened after periodic flush timer had been set
 if debug then
 ngx_log(DEBUG, "performing periodic flush")
 end
 _flush()
 else
 if debug then
 ngx_log(DEBUG, "no need to perform periodic flush: regular flush "
 .. "happened before")
 end
 need_periodic_flush = true
 end
 timer_at(periodic_flush, _periodic_flush)
end
---
-- Performs the flush of the request buffer. The flush is done via
-- a 0-second timer so it can be performed in the background
local function _flush_buffer()
 local ok, err = timer_at(0, _flush)
 need_periodic_flush = false
 if not ok then
 return nil, err
 end
end
---
-- Writes the subrequest to be made to the buffer
-- @param hostname
-- @param port
-- @param uri
-- @param method
-- @param headers
-- @param body
-- @param req_length
local function _write_buffer(hostname, port, uri, method, headers, body, req_length)
 incoming_buffer_index = incoming_buffer_index + 1
 incoming_buffer[incoming_buffer_index] = {
 hostname,
 port,
 uri,
 method,
 headers,
 body,
 req_length
 }
 buffer_size = buffer_size + req_length
 return buffer_size
end
---
-- Queue up this request to be replayed if the token passes the deterministic selector
-- @param token
function _M.queue_request(token)
 if not dispatcher_initialized then
 return false
 end
 local bytes
 -- If we have a token stored, clone this request to other servers
 if token and token ~= "" then
 for hostname, percentage in ipairs(servers) do
 -- Deterministic way of selecting a percentage of tokens. It takes the last 4 bytes of the
 -- uuid, converts it to an integer, and modulos it by 10K, (which should generate a number
 -- between 0 and 9999). If it's less than the configured percentage for this host multiplied
 -- by 10K, then this request is a candidate for cloning. We use 10K as this would give us a
 -- precision of .01%
 if (tonumber(token:sub(-8), 16) % 10000) < (percentage * 10000) then
 -- Slit the IP and port up
 local remote_addr = {}
 for part in string.gmatch(hostname, '([^:]+)') do
 table.insert(remote_addr, part)
 end
 -- Append the query string if we have it, othwerise just use the URI
 local uri = ngx.var.args and ngx.var.uri .. "?" .. ngx.var.args or ngx.var.uri
 local method = ngx.req.get_method()
 local headers = ngx.req.get_headers()
 local body = ngx.req.get_body_data()
 local req_length = ngx.var.request_length
 if (debug) then
 ngx.update_time()
 ngx_log(DEBUG, ngx.now(), ":request length: " .. req_length)
 end
 -- Flush if we're beyond the flush limit or drop limit but if this request will put us beyond thee
 -- drop limit, then don't queue up this request.
 if (req_length + buffer_size < flush_limit) then
 _write_buffer(remote_addr[1], remote_addr[2], uri, method, headers, body, req_length)
 return req_length
 elseif (req_length + buffer_size <= drop_limit) then
 _write_buffer(remote_addr[1], remote_addr[2], uri, method, headers, body, req_length)
 _flush_buffer()
 return req_length
 else
 -- this log message doesn't fit in buffer, drop it
 _flush_buffer()
 if (debug) then
 ngx_log(DEBUG, "request buffer is full, this request will be dropped")
 end
 return false
 end
 end
 end
 end
end
---
-- Initializes the dispatcher module, validates and applies the configuration settings passed
-- @param config
function _M.init(config)
 if (type(config) ~= "table") then
 return nil, "user_config must be a table"
 end
 for k, v in pairs(config) do
 if k == "servers" then
 if type(v) ~= "table" then
 return nil, '"servers" must be a table'
 end
 servers = v
 elseif k == "flush_limit" then
 if type(v) ~= "number" or v < 0 then
 return nil, 'invalid "flush_limit"'
 end
 flush_limit = v
 elseif k == "drop_limit" then
 if type(v) ~= "number" or v < 0 then
 return nil, 'invalid "drop_limit"'
 end
 drop_limit = v
 elseif k == "max_buffer_reuse" then
 if type(v) ~= "number" or v < 0 then
 return nil, 'invalid "max_buffer_reuse"'
 end
 max_buffer_reuse = v
 elseif k == "periodic_flush" then
 if type(v) ~= "number" or v < 0 then
 return nil, 'invalid "periodic_flush"'
 end
 periodic_flush = v
 end
 end
 if (flush_limit >= drop_limit) then
 return nil, "\"flush_limit\" should be < \"drop_limit\""
 end
 flushing = false
 dispatcher_initialized = true
 if periodic_flush then
 if debug then
 ngx_log(DEBUG, "periodic flush enabled for every " .. periodic_flush .. " seconds")
 end
 need_periodic_flush = true
 timer_at(periodic_flush, _periodic_flush)
 end
 return dispatcher_initialized
end
function _M.initialized()
 return dispatcher_initialized
end
_M.flush = _flush
return _M
Jamal
35.2k13 gold badges134 silver badges238 bronze badges
asked Nov 9, 2015 at 20:03
\$\endgroup\$

1 Answer 1

3
\$\begingroup\$

The ngx.var.session_token can be easily setup in store_token_from_uri.lua as follows:

ngx.var.session_token = ngx.ctx.args.token or ngx.ctx.args.id or ngx.var.session_token

where, the value of ngx.var.session_token will be restored in case both ngx.ctx.args.token and ngx.ctx.args.id are nil. The if-elseif blocks are not required anymore.


If you do not want the start and end positions when using string.find, use the string.match function instead. The same can be used instead of your gsub call later in store_token_from_body.lua file. The characters - and . in a lua-pattern have special meaning. Escape them using a % character for a literal matching. Therefore:

if string.find(ngx.var.uri, '/GenerateToken') then
 local resp_body = ngx.arg[1]:sub(1, 8000)
 ngx.ctx.buffered = (ngx.ctx.buffered or "") .. resp_body
 if ngx.arg[2] then
 -- For GET requests with a callback, extract the token from the callback response
 if string.find(ngx.var.uri, '/GenerateToken%.js') and ngx.ctx.args.callback then
 ngx.var.session_token = ngx.ctx.buffered:match "(%x+%-%x+%-%x+%-%x+%-%x+)"
 -- Otherwise, the response should just contain the token.
 else
 -- Trim the response of any trailing newlines
 ngx.var.session_token = ngx.ctx.buffered:match "^%s*(.-)%s*$"
 end
 -- Add the token to the query string so the remote server will not re-create a new token
 ngx.ctx.args.token = ngx.var.session_token
 ngx.req.set_uri_args(ngx.ctx.args)
 end
end

In the dispatcher.lua file, the code snippet for debug logging, can be put inside a separate function:

function debug_log( msg, ... )
 if debug then
 ngx_log(DEBUG, msg, ...)
 end
end

In the _do_flush function, you are concatenating a string buffer repeatedly. Instead, store all errors inside a table, and return table.concat(errors, " "):

local errors = {}
...
 if err then
 table.insert(errors, err)
 end
...
return all_ok, table.concat(errors, " ")
answered Nov 12, 2015 at 10:11
\$\endgroup\$
1
  • \$\begingroup\$ Thanks for the info. table.new is a LuaJIT-ism that pre-allocates memory for a table. \$\endgroup\$ Commented Nov 12, 2015 at 18:34

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.