postgresql-logger插件开发
安装lua
不管是自己从centos镜像安装apisix还是直接用docker官方镜像,里面都预装好了lua。但是为了保证后续用luarock安装postgresql模块不会安错位置,最好重新装一遍lua
添加环境变量
vi /etc/profile
# 结尾追加
LUA_MAJOR_VERSION=5.3
export LUA_MAJOR_VERSION
LUA_MINOR_VERSION=4
export LUA_MINOR_VERSION
LUA_VERSION=5.3.4
export LUA_VERSION
WITH_LUA=/usr/local/
export WITH_LUA
LUA_LIB=/usr/local/lib/lua
export LUA_LIB
LUA_INCLUDE=/usr/local/include
export LUA_INCLUDE
LUAROCKS_VERSION=2.4.2
export LUAROCKS_VERSION
LUAROCKS_INSTALL=luarocks-2.4.2
export LUAROCKS_INSTALL
TMP_LOC=/tmp/luarocks
export TMP_LOC
安装lua
yum install -y wget make gcc readline-devel unzip zip git
wget http://www.lua.org/ftp/lua-5.3.4.tar.gz
tar -zxvf lua-5.3.4.tar.gz && cd lua-5.3.4
make linux test && make install
rm -rf /lua-5.3.4*
插件源码
插件可以在gitee上下载
-- 该插件以apisix内置的http-logger插件为基础开发,很多运行逻辑是按http-logger的运行逻辑走的
local batch_processor = require("apisix.utils.batch-processor")
local log_util = require("apisix.utils.log-util")
local core = require("apisix.core")
local http = require("resty.http")
local url = require("net.url")
local plugin = require("apisix.plugin")
local ngx = ngx
local tostring = tostring
local pairs = pairs
local ipairs = ipairs
local str_byte = string.byte
local timer_at = ngx.timer.at
local plugin_name = "postgresql-logger"
local stale_timer_running = false
local buffers = {}
local lru_log_format = core.lrucache.new({
ttl = 300, count = 512
})
--导入依赖
local luasql = require "luasql.postgres"
local schema = {
type = "object",
properties = {
host = { type = "string" }, -- postgresql服务器ip
port = { type = "string" }, -- postgresql服务端口
dbname = { type = "string" }, -- postgresql日志表所在数据库
username = { type = "string" }, -- postgresql用户名
password = { type = "string" }, -- postgresql密码
tablename = { type = "string" }, -- postgresql日志表
uri = core.schema.uri_def,
auth_header = { type = "string", default = "" },
timeout = { type = "integer", minimum = 1, default = 3 },
name = { type = "string", default = "postgresql-logger" },-- batch_processor的参数
max_retry_count = { type = "integer", minimum = 0, default = 0 },-- batch_processor的参数
retry_delay = { type = "integer", minimum = 0, default = 1 },-- batch_processor的参数
buffer_duration = { type = "integer", minimum = 1, default = 60 },-- batch_processor的参数
inactive_timeout = { type = "integer", minimum = 1, default = 5 },-- batch_processor的参数
batch_max_size = { type = "integer", minimum = 1, default = 1000 },-- batch_processor的参数
include_req_body = { type = "boolean", default = false },
concat_method = { type = "string", default = "json",
enum = { "json", "new_line" } }
},
required = { "host", "port", "dbname", "username", "password","tablename" }
--apisix插件必须配置的参数,在apisix开启时会检查每个插件的配置是否有误,插件有误则该转发会失效
}
-- 日志格式元数据
local metadata_schema = {
type = "object",
properties = {
log_format = {
type = "object",
default = {
["host"] = "$host",
["@timestamp"] = "$time_iso8601",
["client_ip"] = "$remote_addr",
["route_id"] = "$route_id",
["route_name"] = "$route_id",
["service_id"] = "$route_id",
["service_name"] = "$route_id",
["client_ip"] = "$remote_addr",
},
},
},
additionalProperties = false,
}
-- 插件元数据
local _M = {
version = 0.1,
priority = 410,
name = plugin_name,
schema = schema,
metadata_schema = metadata_schema,
}
--conf就是我们进行插件配置的信息,这些信息会和schema对比,验证配置的格式合法性和是否缺失等等
function _M.check_schema(conf)
return core.schema.check(schema, conf)
end
--发送日志,conf是插件配置
local function send_postgresql_data(conf, log_message)
local err_msg -- 错误信息
local res = true -- 发送是否成功
-- 获取环境
local env = luasql.postgres()
if (nil == env) then
res = false;
err_msg = "luasql-postgres environment failure";
return res ,err_msg;
end
-- port是string类型,转为number类型,tonumber(conf.port)
local conn = env:connect(conf.dbname, conf.username, conf.password, conf.host, tonumber(conf.port));
if (nil == conn) then
res = false;
err_msg = "connect postgresql server failure";
env:close();
return res ,err_msg;
end
local resultLine = conn:execute("INSERT INTO "..conf.tablename.." (log) VALUES ('"..log_message.."');")
-- 插入行数不为1,则插入失败
if (1.0 ~= resultLine) then
res = false;
err_msg = "insert into "..conf.tablename.." failure in "..conf.dbname.." of database";
conn:close();
env:close();
return res ,err_msg;
end
-- 资源关闭
conn:close();
env:close();
return res, err_msg
end
--生成日志格式,metadata是插件元数据配置
local function gen_log_format(metadata)
local log_format = {}
--如果我们没有设置格式,,apisix也没有默认则为空则
if metadata == nil then
return log_format
end
for k, var_name in pairs(metadata.value.log_format) do
if var_name:byte(1, 1) == str_byte("$") then
-- $开头则获取环境变量
log_format[k] = { true, var_name:sub(2) }
else
log_format[k] = { false, var_name }
end
end
--向apisix报告日志格式更新的行为
core.log.info("log_format: ", core.json.delay_encode(log_format))
return log_format
end
-- 移除日志缓存,缓存本质上是个table,保证内存不会占用过多
local function remove_stale_objects(premature)
if premature then
return
end
for key, batch in ipairs(buffers) do
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
core.log.warn("removing batch processor stale object, conf: ",
core.json.delay_encode(key))
buffers[key] = nil -- 置为nil,清空数据
end
end
stale_timer_running = false -- 重启移除缓存标识
end
--apisix最终去执行的是这个方法去记录日志
function _M.log(conf, ctx)
--获取插件元数据
local metadata = plugin.plugin_metadata(plugin_name)
--apisix记录运行日志
core.log.info("metadata: ", core.json.delay_encode(metadata))
local entry
-- 获取日志格式
local log_format = lru_log_format(metadata or "", nil, gen_log_format,
metadata)
-- 用该日志格式,获取日志内容
if core.table.nkeys(log_format) > 0 then
entry = core.table.new(0, core.table.nkeys(log_format))
for k, var_attr in pairs(log_format) do
if var_attr[1] then
entry[k] = ctx.var[var_attr[2]]
else
entry[k] = var_attr[2]
end
end
local matched_route = ctx.matched_route and ctx.matched_route.value
if matched_route then
entry.service_id = matched_route.service_id
entry.route_id = matched_route.id
end
else
entry = log_util.get_full_log(ngx, conf)
end
-- 无路由匹配的日志
if not entry.route_id then
entry.route_id = "no-matched"
end
-- 是否清空日志缓存?
if not stale_timer_running then
-- 有日志在缓存里呆了超过30分钟,直接移除
timer_at(1800, remove_stale_objects)
stale_timer_running = true -- 开启移除日子缓存标识
end
local log_buffer = buffers[conf]
if log_buffer then
log_buffer:push(entry)
return
end
-- 批处理器的执行方法
local func = function(entries, batch_max_size)
local data, err
if conf.concat_method == "json" then
if batch_max_size == 1 then
-- 如果是json格式并且缓存最大值为1,解析数据
data, err = core.json.encode(entries[1]) -- 解析成json串 {}
else
-- 缓存最大值大于1,那么解析所有日志数据为json格式的数组
data, err = core.json.encode(entries)
end
elseif conf.concat_method == "new_line" then
if batch_max_size == 1 then
data, err = core.json.encode(entries[1]) -- 缓存最大值为1,解析为单条普通字符串格式
else
local t = core.table.new(#entries, 0)
for i, entry in ipairs(entries) do
t[i], err = core.json.encode(entry)
if err then
core.log.warn("failed to encode http log: ", err, ", log data: ", entry)
break
end
end
data = core.table.concat(t, "\n") -- 缓存最大值大于1,解析为多条普通字符串格式,多条间换行符隔开
end
else
-- 日志内容的格式只支持json和new_line,设成其他格式则报错
err = "unknown concat_method " .. (conf.concat_method or "nil")
end
--数据解析失败
if not data then
return false, 'error occurred while encoding the data: ' .. err
end
-- 解析完准备发送
return send_postgresql_data(conf, data)
end
--读取一下我们配置插件信息里面对于批处理器的配置
local config = {
name = conf.name,
retry_delay = conf.retry_delay,
batch_max_size = conf.batch_max_size,
max_retry_count = conf.max_retry_count,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.inactive_timeout,
route_id = ctx.var.route_id,
server_addr = ctx.var.server_addr,
}
local err
log_buffer, err = batch_processor:new(func, config) -- 让batch_processor去执行log的发送
if not log_buffer then
core.log.error("error when creating the batch processor: ", err)
return
end
buffers[conf] = log_buffer
log_buffer:push(entry)
end
return _M
该插件要放在/usr/local/apisix/apisix/plugins下
安装luarocks
apisix以之前博客中的dockerfile镜像构建,里面的apisix已经内置了lua,我们可以不用额外安装lua。但是要安装一下luarocks这个包管理器,用来安装luasql-postgres。
寻找lua头文件和执行文件位置,这些位置需要在安装luarocks时进行指定
find / -name lua.h
find / -name lua
----------
结果:
头文件在/usr/local/openresty/luajit/include/luajit-2.1
执行文件在/usr/bin
配置环境变量,该变量会在luarocks安装时读取,用来确定lua模块安装的位置
vi /etc/profile
LUA_LIB=/usr/local/lib/lua
export LUA_LIB
安装luarocks,--with-lua:lua执行文件位置,不要加/bin(上述找到的位置是/usr/bin),--with-lua-include:头文件所在位置
wget https://luarocks.org/releases/luarocks-2.4.2.tar.gz
tar -zxvf luarocks-2.4.2.tar.gz
cd luarocks-2.4.2
./configure --with-lua-include=/usr/local/openresty/luajit/include/luajit-2.1 --with-lua=/usr
make build && make install
luarocks安装的模块配置的路径,在apisix插件中进行require是可以找得到的,不用担心
安装luasql-postgres
# luasql-postgres 会依赖postgresql的源码和头文件
yum install postgresql-devel –y
# 指定postgresql的源码和头文件位置
luarocks install luasql-postgres PPGSQL_LIBDIR=/usr/lib PGSQL_INCDIR=/usr/include
开启插件并重启apisix
在/usr/local/apisix/conf/config-default.yaml中寻找plugins标签,在子项中添加:
plugins:
- api-breaker
- authz-keycloak
- …
- postgresql-logger
为路由配置postgresql-logger插件
在/usr/local/apisix/conf/apisix.yaml中配置,下面显示了路由的配置,其他配置见之前的博客《apisix的转发配置和插件设置》,该博客就是以yaml配置中心的apisix为例的
routes:
-
uri: /myinfo
upstream_id: 1
plugins:
postgresql-logger: # 以下6个参数是必须的
host: "159.75.26.246" # postgresql服务器ip
port: "5432" # postgresql服务端口
dbname: "test" # postgresql日志表所在的数据库
username: "postgres" # postgresql用户名
password: "xxxxxxxx" # postgresql密码
tablename: "LOG" # postgresql日志表
测试
如图,在postgresql部署在机器的另一个镜像上,成功记录进去
apisix内置插件源码解析
http-logger源码解析
基本每一句都注释了,直接看吧,已经很细了
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local batch_processor = require("apisix.utils.batch-processor")
local log_util = require("apisix.utils.log-util")
local core = require("apisix.core")
local http = require("resty.http")
local url = require("net.url")
local plugin = require("apisix.plugin")
local ngx = ngx
local tostring = tostring
local pairs = pairs
local ipairs = ipairs
local str_byte = string.byte
local timer_at = ngx.timer.at
local plugin_name = "http-logger"
local stale_timer_running = false
local buffers = {}
local lru_log_format = core.lrucache.new({
ttl = 300, count = 512
})
local schema = {
type = "object",
properties = {
uri = core.schema.uri_def,-- http发送日志的url
auth_header = {type = "string", default = ""},-- http连接的鉴权头
timeout = {type = "integer", minimum = 1, default = 3},-- http连接超时时间,秒
name = {type = "string", default = "http logger"},-- batch_processor的参数
max_retry_count = {type = "integer", minimum = 0, default = 0},-- batch_processor的参数
retry_delay = {type = "integer", minimum = 0, default = 1},-- batch_processor的参数
buffer_duration = {type = "integer", minimum = 1, default = 60},-- batch_processor的参数
inactive_timeout = {type = "integer", minimum = 1, default = 5},-- batch_processor的参数
batch_max_size = {type = "integer", minimum = 1, default = 1000},-- batch_processor的参数
include_req_body = {type = "boolean", default = false},
concat_method = {type = "string", default = "json",
enum = {"json", "new_line"}}
},
required = {"uri"}
}
--默认的日志格式
local metadata_schema = {
type = "object",
properties = {
log_format = {
type = "object",
default = {
["host"] = "$host",
["@timestamp"] = "$time_iso8601",
["client_ip"] = "$remote_addr",
},
},
},
additionalProperties = false,
}
--插件元数据
local _M = {
version = 0.1,
priority = 410,
name = plugin_name,
schema = schema,
metadata_schema = metadata_schema,
}
--conf就是我们进行插件配置的信息,这些信息会和schema对比,验证配置的格式合法性和是否缺失等等
function _M.check_schema(conf)
return core.schema.check(schema, conf)
end
--发送日志,conf是插件配置
local function send_http_data(conf, log_message)
local err_msg
local res = true
local url_decoded = url.parse(conf.uri)
local host = url_decoded.host
local port = url_decoded.port
--让apisix记录一下日志插件的行为日志
core.log.info("sending a batch logs to ", conf.uri)
--设置端口
if ((not port) and url_decoded.scheme == "https") then
port = 443
elseif not port then
port = 80
end
--创建请求对象
local httpc = http.new()
httpc:set_timeout(conf.timeout * 1000)
local ok, err = httpc:connect(host, port)
--http是否连接成功
if not ok then
return false, "failed to connect to host[" .. host .. "] port["
.. tostring(port) .. "] " .. err
end
--https是否连接成功
if url_decoded.scheme == "https" then
ok, err = httpc:ssl_handshake(true, host, false)
if not ok then
return nil, "failed to perform SSL with host[" .. host .. "] "
.. "port[" .. tostring(port) .. "] " .. err
end
end
--判断内容类型
local content_type
if conf.concat_method == "json" then
content_type = "application/json"
else
content_type = "text/plain"
end
--设置请求头发送日志
local httpc_res, httpc_err = httpc:request({
method = "POST",
path = url_decoded.path,
query = url_decoded.query,
body = log_message,
headers = {
["Host"] = url_decoded.host,
["Content-Type"] = content_type,
["Authorization"] = conf.auth_header
}
})
--发送日志失败
if not httpc_res then
return false, "error while sending data to [" .. host .. "] port["
.. tostring(port) .. "] " .. httpc_err
end
-- 远程服务器出现错误
if httpc_res.status >= 400 then
res = false
err_msg = "server returned status code[" .. httpc_res.status .. "] host["
.. host .. "] port[" .. tostring(port) .. "] "
.. "body[" .. httpc_res:read_body() .. "]"
end
return res, err_msg
end
--生成日志格式,metadata是插件元数据配置
local function gen_log_format(metadata)
local log_format = {}
--如果我们没有设置格式,apisix也没有默认则为空则
if metadata == nil then
return log_format
end
for k, var_name in pairs(metadata.value.log_format) do
if var_name:byte(1, 1) == str_byte("$") then -- $开头则获取环境变量
log_format[k] = {true, var_name:sub(2)}
else
log_format[k] = {false, var_name}
end
end
--向apisix报告日志格式更新的行为
core.log.info("log_format: ", core.json.delay_encode(log_format))
return log_format
end
-- remove stale objects from the memory after timer expires
local function remove_stale_objects(premature)
if premature then
return
end
for key, batch in ipairs(buffers) do
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
core.log.warn("removing batch processor stale object, conf: ",
core.json.delay_encode(key))
buffers[key] = nil
end
end
stale_timer_running = false
end
--apisix最终去执行的是这个方法
function _M.log(conf, ctx)
--获取插件元数据
local metadata = plugin.plugin_metadata(plugin_name)
--apisix记录运行日志
core.log.info("metadata: ", core.json.delay_encode(metadata))
local entry
local log_format = lru_log_format(metadata or "", nil, gen_log_format,
metadata)
if core.table.nkeys(log_format) > 0 then
entry = core.table.new(0, core.table.nkeys(log_format))
for k, var_attr in pairs(log_format) do
if var_attr[1] then
entry[k] = ctx.var[var_attr[2]]
else
entry[k] = var_attr[2]
end
end
local matched_route = ctx.matched_route and ctx.matched_route.value
if matched_route then
entry.service_id = matched_route.service_id
entry.route_id = matched_route.id
end
else
entry = log_util.get_full_log(ngx, conf)
end
if not entry.route_id then
entry.route_id = "no-matched"
end
if not stale_timer_running then
-- run the timer every 30 mins if any log is present
timer_at(1800, remove_stale_objects)
stale_timer_running = true
end
local log_buffer = buffers[conf]
if log_buffer then
log_buffer:push(entry)
return
end
-- 为batch processor创建一个执行方法
local func = function(entries, batch_max_size)
local data, err
if conf.concat_method == "json" then
if batch_max_size == 1 then -- 如果是json格式并且缓存最大值为1,解析数据
data, err = core.json.encode(entries[1]) -- encode as single {}
else -- 缓存最大值大于1,那么解析所有日志数据为json格式的数组
data, err = core.json.encode(entries)
end
elseif conf.concat_method == "new_line" then
if batch_max_size == 1 then
data, err = core.json.encode(entries[1]) -- 缓存最大值为1,解析为单条普通字符串格式
else
local t = core.table.new(#entries, 0)
for i, entry in ipairs(entries) do
t[i], err = core.json.encode(entry)
if err then
core.log.warn("failed to encode http log: ", err, ", log data: ", entry)
break
end
end
data = core.table.concat(t, "\n") -- 缓存最大值大于1,解析为多条普通字符串格式,多条间换行符隔开
end
else
-- defensive programming check
err = "unknown concat_method " .. (conf.concat_method or "nil")
end
--数据解析失败
if not data then
return false, 'error occurred while encoding the data: ' .. err
end
-- 解析完准备发送
return send_http_data(conf, data)
end
--读取一下我们配置插件信息里面对于批处理器的配置
local config = {
name = conf.name,
retry_delay = conf.retry_delay,
batch_max_size = conf.batch_max_size,
max_retry_count = conf.max_retry_count,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.inactive_timeout,
route_id = ctx.var.route_id,
server_addr = ctx.var.server_addr,
}
-- 让batch_processor去执行log的发送
local err
log_buffer, err = batch_processor:new(func, config)
-- 发送失败
if not log_buffer then
core.log.error("error when creating the batch processor: ", err)
return
end
buffers[conf] = log_buffer
log_buffer:push(entry)
end
return _M
评论区