标签搜索

目 录CONTENT

文章目录

apisix的postgresql-logger插件开发及内置插件http-logger源码解析

陈铭
2021-07-20 / 0 评论 / 0 点赞 / 429 阅读 / 4,179 字 / 正在检测是否收录...

postgresql-logger插件开发

安装lua

不管是自己从centos镜像安装apisix还是直接用docker官方镜像,里面都预装好了lua。但是为了保证后续用luarock安装postgresql模块不会安错位置,最好重新装一遍lua

添加环境变量

vi /etc/profile
# 结尾追加
LUA_MAJOR_VERSION=5.3
export LUA_MAJOR_VERSION
LUA_MINOR_VERSION=4
export LUA_MINOR_VERSION
LUA_VERSION=5.3.4
export LUA_VERSION
WITH_LUA=/usr/local/
export WITH_LUA
LUA_LIB=/usr/local/lib/lua
export LUA_LIB
LUA_INCLUDE=/usr/local/include
export LUA_INCLUDE
LUAROCKS_VERSION=2.4.2
export LUAROCKS_VERSION
LUAROCKS_INSTALL=luarocks-2.4.2
export LUAROCKS_INSTALL
TMP_LOC=/tmp/luarocks
export TMP_LOC

安装lua

yum install -y wget make gcc readline-devel unzip zip git
wget http://www.lua.org/ftp/lua-5.3.4.tar.gz
tar -zxvf lua-5.3.4.tar.gz && cd lua-5.3.4
make linux test && make install
rm -rf /lua-5.3.4*

插件源码

插件可以在gitee上下载


-- 该插件以apisix内置的http-logger插件为基础开发,很多运行逻辑是按http-logger的运行逻辑走的

local batch_processor = require("apisix.utils.batch-processor")
local log_util = require("apisix.utils.log-util")
local core = require("apisix.core")
local http = require("resty.http")
local url = require("net.url")
local plugin = require("apisix.plugin")

local ngx = ngx
local tostring = tostring
local pairs = pairs
local ipairs = ipairs
local str_byte = string.byte
local timer_at = ngx.timer.at

local plugin_name = "postgresql-logger"
local stale_timer_running = false
local buffers = {}
local lru_log_format = core.lrucache.new({
    ttl = 300, count = 512
})
--导入依赖
local luasql = require "luasql.postgres"

local schema = {
    type = "object",
    properties = {
        host = { type = "string" }, -- postgresql服务器ip
        port = { type = "string" }, -- postgresql服务端口
        dbname = { type = "string" }, -- postgresql日志表所在数据库
        username = { type = "string" }, -- postgresql用户名
        password = { type = "string" }, -- postgresql密码
        tablename = { type = "string" }, -- postgresql日志表
        uri = core.schema.uri_def,
        auth_header = { type = "string", default = "" },
        timeout = { type = "integer", minimum = 1, default = 3 },
        name = { type = "string", default = "postgresql-logger" },-- batch_processor的参数
        max_retry_count = { type = "integer", minimum = 0, default = 0 },-- batch_processor的参数
        retry_delay = { type = "integer", minimum = 0, default = 1 },-- batch_processor的参数
        buffer_duration = { type = "integer", minimum = 1, default = 60 },-- batch_processor的参数
        inactive_timeout = { type = "integer", minimum = 1, default = 5 },-- batch_processor的参数
        batch_max_size = { type = "integer", minimum = 1, default = 1000 },-- batch_processor的参数
        include_req_body = { type = "boolean", default = false },
        concat_method = { type = "string", default = "json",
                          enum = { "json", "new_line" } }
    },
    required = { "host", "port", "dbname", "username", "password","tablename" }
    --apisix插件必须配置的参数,在apisix开启时会检查每个插件的配置是否有误,插件有误则该转发会失效
}

-- 日志格式元数据
local metadata_schema = {
    type = "object",
    properties = {
        log_format = {
            type = "object",
            default = {
                ["host"] = "$host",
                ["@timestamp"] = "$time_iso8601",
                ["client_ip"] = "$remote_addr",
                ["route_id"] = "$route_id",
                ["route_name"] = "$route_id",
                ["service_id"] = "$route_id",
                ["service_name"] = "$route_id",
                ["client_ip"] = "$remote_addr",
            },
        },
    },
    additionalProperties = false,
}

-- 插件元数据
local _M = {
    version = 0.1,
    priority = 410,
    name = plugin_name,
    schema = schema,
    metadata_schema = metadata_schema,
}

--conf就是我们进行插件配置的信息,这些信息会和schema对比,验证配置的格式合法性和是否缺失等等
function _M.check_schema(conf)
    return core.schema.check(schema, conf)
end

--发送日志,conf是插件配置
local function send_postgresql_data(conf, log_message)
    local err_msg  -- 错误信息
    local res = true -- 发送是否成功

    -- 获取环境
    local env = luasql.postgres()

    if (nil == env) then
        res = false;
        err_msg = "luasql-postgres environment failure";
        return res ,err_msg;
    end

    -- port是string类型,转为number类型,tonumber(conf.port)
    local conn = env:connect(conf.dbname, conf.username, conf.password, conf.host, tonumber(conf.port));

    if (nil == conn) then
        res = false;
        err_msg = "connect postgresql server failure";
        env:close();
        return res ,err_msg;
    end

    local resultLine = conn:execute("INSERT INTO "..conf.tablename.." (log) VALUES ('"..log_message.."');")

    -- 插入行数不为1,则插入失败
    if (1.0 ~= resultLine) then
        res = false;
        err_msg = "insert into "..conf.tablename.." failure in "..conf.dbname.." of database";
        conn:close();
        env:close();
        return res ,err_msg;
    end

    -- 资源关闭
    conn:close();
    env:close();
    return res, err_msg
end

--生成日志格式,metadata是插件元数据配置
local function gen_log_format(metadata)
    local log_format = {}
    --如果我们没有设置格式,,apisix也没有默认则为空则
    if metadata == nil then
        return log_format
    end

    for k, var_name in pairs(metadata.value.log_format) do
        if var_name:byte(1, 1) == str_byte("$") then
            -- $开头则获取环境变量
            log_format[k] = { true, var_name:sub(2) }
        else
            log_format[k] = { false, var_name }
        end
    end
    --向apisix报告日志格式更新的行为
    core.log.info("log_format: ", core.json.delay_encode(log_format))
    return log_format
end


-- 移除日志缓存,缓存本质上是个table,保证内存不会占用过多
local function remove_stale_objects(premature)
    if premature then
        return
    end

    for key, batch in ipairs(buffers) do
        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
            core.log.warn("removing batch processor stale object, conf: ",
                    core.json.delay_encode(key))
            buffers[key] = nil -- 置为nil,清空数据
        end
    end

    stale_timer_running = false -- 重启移除缓存标识
end

--apisix最终去执行的是这个方法去记录日志
function _M.log(conf, ctx)
    --获取插件元数据
    local metadata = plugin.plugin_metadata(plugin_name)
    --apisix记录运行日志
    core.log.info("metadata: ", core.json.delay_encode(metadata))

    local entry
    -- 获取日志格式
    local log_format = lru_log_format(metadata or "", nil, gen_log_format,
            metadata)
    -- 用该日志格式,获取日志内容
    if core.table.nkeys(log_format) > 0 then
        entry = core.table.new(0, core.table.nkeys(log_format))
        for k, var_attr in pairs(log_format) do
            if var_attr[1] then
                entry[k] = ctx.var[var_attr[2]]
            else
                entry[k] = var_attr[2]
            end
        end

        local matched_route = ctx.matched_route and ctx.matched_route.value
        if matched_route then
            entry.service_id = matched_route.service_id
            entry.route_id = matched_route.id
        end
    else
        entry = log_util.get_full_log(ngx, conf)
    end

    -- 无路由匹配的日志
    if not entry.route_id then
        entry.route_id = "no-matched"
    end

    -- 是否清空日志缓存?
    if not stale_timer_running then
        -- 有日志在缓存里呆了超过30分钟,直接移除
        timer_at(1800, remove_stale_objects)
        stale_timer_running = true -- 开启移除日子缓存标识
    end

    local log_buffer = buffers[conf]

    if log_buffer then
        log_buffer:push(entry)
        return
    end

    -- 批处理器的执行方法
    local func = function(entries, batch_max_size)
        local data, err

        if conf.concat_method == "json" then
            if batch_max_size == 1 then
                -- 如果是json格式并且缓存最大值为1,解析数据
                data, err = core.json.encode(entries[1]) -- 解析成json串 {}
            else
                -- 缓存最大值大于1,那么解析所有日志数据为json格式的数组
                data, err = core.json.encode(entries)
            end

        elseif conf.concat_method == "new_line" then
            if batch_max_size == 1 then
                data, err = core.json.encode(entries[1]) -- 缓存最大值为1,解析为单条普通字符串格式
            else
                local t = core.table.new(#entries, 0)
                for i, entry in ipairs(entries) do
                    t[i], err = core.json.encode(entry)
                    if err then
                        core.log.warn("failed to encode http log: ", err, ", log data: ", entry)
                        break
                    end
                end
                data = core.table.concat(t, "\n") -- 缓存最大值大于1,解析为多条普通字符串格式,多条间换行符隔开
            end

        else
            -- 日志内容的格式只支持json和new_line,设成其他格式则报错
            err = "unknown concat_method " .. (conf.concat_method or "nil")
        end

        --数据解析失败
        if not data then
            return false, 'error occurred while encoding the data: ' .. err
        end
        -- 解析完准备发送
        return send_postgresql_data(conf, data)
    end

    --读取一下我们配置插件信息里面对于批处理器的配置
    local config = {
        name = conf.name,
        retry_delay = conf.retry_delay,
        batch_max_size = conf.batch_max_size,
        max_retry_count = conf.max_retry_count,
        buffer_duration = conf.buffer_duration,
        inactive_timeout = conf.inactive_timeout,
        route_id = ctx.var.route_id,
        server_addr = ctx.var.server_addr,
    }

    local err
    log_buffer, err = batch_processor:new(func, config) -- 让batch_processor去执行log的发送


    if not log_buffer then
        core.log.error("error when creating the batch processor: ", err)
        return
    end

    buffers[conf] = log_buffer
    log_buffer:push(entry)
end

return _M

该插件要放在/usr/local/apisix/apisix/plugins下

安装luarocks

apisix以之前博客中的dockerfile镜像构建,里面的apisix已经内置了lua,我们可以不用额外安装lua。但是要安装一下luarocks这个包管理器,用来安装luasql-postgres。

寻找lua头文件和执行文件位置,这些位置需要在安装luarocks时进行指定

find / -name lua.h
find / -name lua
----------
结果:
头文件在/usr/local/openresty/luajit/include/luajit-2.1
执行文件在/usr/bin

配置环境变量,该变量会在luarocks安装时读取,用来确定lua模块安装的位置

vi /etc/profile
LUA_LIB=/usr/local/lib/lua
export LUA_LIB

安装luarocks,--with-lua:lua执行文件位置,不要加/bin(上述找到的位置是/usr/bin),--with-lua-include:头文件所在位置

wget https://luarocks.org/releases/luarocks-2.4.2.tar.gz
tar -zxvf luarocks-2.4.2.tar.gz 
cd luarocks-2.4.2
./configure --with-lua-include=/usr/local/openresty/luajit/include/luajit-2.1 --with-lua=/usr  
make build && make install

luarocks安装的模块配置的路径,在apisix插件中进行require是可以找得到的,不用担心

安装luasql-postgres

# luasql-postgres 会依赖postgresql的源码和头文件
yum install postgresql-devel –y
# 指定postgresql的源码和头文件位置
luarocks install luasql-postgres PPGSQL_LIBDIR=/usr/lib PGSQL_INCDIR=/usr/include

开启插件并重启apisix

在/usr/local/apisix/conf/config-default.yaml中寻找plugins标签,在子项中添加:

plugins:
  - api-breaker
  - authz-keycloak
  - …
  - postgresql-logger

为路由配置postgresql-logger插件

在/usr/local/apisix/conf/apisix.yaml中配置,下面显示了路由的配置,其他配置见之前的博客《apisix的转发配置和插件设置》,该博客就是以yaml配置中心的apisix为例的

routes:
  -
    uri: /myinfo
    upstream_id: 1
    plugins:
      postgresql-logger: # 以下6个参数是必须的
        host: "159.75.26.246" # postgresql服务器ip
        port: "5432" # postgresql服务端口
        dbname: "test" # postgresql日志表所在的数据库
        username: "postgres" # postgresql用户名
        password: "xxxxxxxx" # postgresql密码
        tablename: "LOG" # postgresql日志表

测试

如图,在postgresql部署在机器的另一个镜像上,成功记录进去
image

apisix内置插件源码解析

http-logger源码解析

基本每一句都注释了,直接看吧,已经很细了

--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements.  See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License.  You may obtain a copy of the License at
--
--     http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

local batch_processor = require("apisix.utils.batch-processor")
local log_util        = require("apisix.utils.log-util")
local core            = require("apisix.core")
local http            = require("resty.http")
local url             = require("net.url")
local plugin          = require("apisix.plugin")

local ngx      = ngx
local tostring = tostring
local pairs    = pairs
local ipairs   = ipairs
local str_byte = string.byte
local timer_at = ngx.timer.at

local plugin_name = "http-logger"
local stale_timer_running = false
local buffers = {}
local lru_log_format = core.lrucache.new({
    ttl = 300, count = 512
})

local schema = {
    type = "object",
    properties = {
        uri = core.schema.uri_def,-- http发送日志的url
        auth_header = {type = "string", default = ""},-- http连接的鉴权头
        timeout = {type = "integer", minimum = 1, default = 3},-- http连接超时时间,秒
        name = {type = "string", default = "http logger"},-- batch_processor的参数
        max_retry_count = {type = "integer", minimum = 0, default = 0},-- batch_processor的参数
        retry_delay = {type = "integer", minimum = 0, default = 1},-- batch_processor的参数
        buffer_duration = {type = "integer", minimum = 1, default = 60},-- batch_processor的参数
        inactive_timeout = {type = "integer", minimum = 1, default = 5},-- batch_processor的参数
        batch_max_size = {type = "integer", minimum = 1, default = 1000},-- batch_processor的参数
        include_req_body = {type = "boolean", default = false},
        concat_method = {type = "string", default = "json",
                         enum = {"json", "new_line"}}
    },
    required = {"uri"}
}

--默认的日志格式
local metadata_schema = {
    type = "object",
    properties = {
        log_format = {
            type = "object",
            default = {
                ["host"] = "$host",
                ["@timestamp"] = "$time_iso8601",
                ["client_ip"] = "$remote_addr",
            },
        },
    },
    additionalProperties = false,
}

--插件元数据
local _M = {
    version = 0.1,
    priority = 410,
    name = plugin_name,
    schema = schema,
    metadata_schema = metadata_schema,
}

--conf就是我们进行插件配置的信息,这些信息会和schema对比,验证配置的格式合法性和是否缺失等等
function _M.check_schema(conf)
    return core.schema.check(schema, conf)
end

--发送日志,conf是插件配置
local function send_http_data(conf, log_message)
    local err_msg
    local res = true
    local url_decoded = url.parse(conf.uri)
    local host = url_decoded.host
    local port = url_decoded.port

    --让apisix记录一下日志插件的行为日志
    core.log.info("sending a batch logs to ", conf.uri)

    --设置端口
    if ((not port) and url_decoded.scheme == "https") then
        port = 443
    elseif not port then
        port = 80
    end

    --创建请求对象
    local httpc = http.new()
    httpc:set_timeout(conf.timeout * 1000)
    local ok, err = httpc:connect(host, port)

    --http是否连接成功
    if not ok then
        return false, "failed to connect to host[" .. host .. "] port["
            .. tostring(port) .. "] " .. err
    end

    --https是否连接成功
    if url_decoded.scheme == "https" then
        ok, err = httpc:ssl_handshake(true, host, false)
        if not ok then
            return nil, "failed to perform SSL with host[" .. host .. "] "
                .. "port[" .. tostring(port) .. "] " .. err
        end
    end

    --判断内容类型
    local content_type
    if conf.concat_method == "json" then
        content_type = "application/json"
    else
        content_type = "text/plain"
    end

    --设置请求头发送日志
    local httpc_res, httpc_err = httpc:request({
        method = "POST",
        path = url_decoded.path,
        query = url_decoded.query,
        body = log_message,
        headers = {
            ["Host"] = url_decoded.host,
            ["Content-Type"] = content_type,
            ["Authorization"] = conf.auth_header
        }
    })

    --发送日志失败
    if not httpc_res then
        return false, "error while sending data to [" .. host .. "] port["
            .. tostring(port) .. "] " .. httpc_err
    end

    -- 远程服务器出现错误
    if httpc_res.status >= 400 then
        res =  false
        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
            .. host .. "] port[" .. tostring(port) .. "] "
            .. "body[" .. httpc_res:read_body() .. "]"
    end

    return res, err_msg
end

--生成日志格式,metadata是插件元数据配置
local function gen_log_format(metadata)
    local log_format = {}
    --如果我们没有设置格式,apisix也没有默认则为空则
    if metadata == nil then
        return log_format
    end

    for k, var_name in pairs(metadata.value.log_format) do
        if var_name:byte(1, 1) == str_byte("$") then -- $开头则获取环境变量
            log_format[k] = {true, var_name:sub(2)}
        else
            log_format[k] = {false, var_name}
        end
    end
    --向apisix报告日志格式更新的行为
    core.log.info("log_format: ", core.json.delay_encode(log_format))
    return log_format
end


-- remove stale objects from the memory after timer expires
local function remove_stale_objects(premature)
    if premature then
        return
    end

    for key, batch in ipairs(buffers) do
        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
            core.log.warn("removing batch processor stale object, conf: ",
                          core.json.delay_encode(key))
            buffers[key] = nil
        end
    end

    stale_timer_running = false
end

--apisix最终去执行的是这个方法
function _M.log(conf, ctx)
    --获取插件元数据
    local metadata = plugin.plugin_metadata(plugin_name)
    --apisix记录运行日志
    core.log.info("metadata: ", core.json.delay_encode(metadata))

    local entry
    local log_format = lru_log_format(metadata or "", nil, gen_log_format,
                                      metadata)
    if core.table.nkeys(log_format) > 0 then
        entry = core.table.new(0, core.table.nkeys(log_format))
        for k, var_attr in pairs(log_format) do
            if var_attr[1] then
                entry[k] = ctx.var[var_attr[2]]
            else
                entry[k] = var_attr[2]
            end
        end

        local matched_route = ctx.matched_route and ctx.matched_route.value
        if matched_route then
            entry.service_id = matched_route.service_id
            entry.route_id = matched_route.id
        end
    else
        entry = log_util.get_full_log(ngx, conf)
    end

    if not entry.route_id then
        entry.route_id = "no-matched"
    end

    if not stale_timer_running then
        -- run the timer every 30 mins if any log is present
        timer_at(1800, remove_stale_objects)
        stale_timer_running = true
    end

    local log_buffer = buffers[conf]

    if log_buffer then
        log_buffer:push(entry)
        return
    end

    -- 为batch processor创建一个执行方法
    local func = function(entries, batch_max_size)
        local data, err

        if conf.concat_method == "json" then
            if batch_max_size == 1 then -- 如果是json格式并且缓存最大值为1,解析数据
                data, err = core.json.encode(entries[1]) -- encode as single {}
            else  -- 缓存最大值大于1,那么解析所有日志数据为json格式的数组
                data, err = core.json.encode(entries)
            end

        elseif conf.concat_method == "new_line" then
            if batch_max_size == 1 then
                data, err = core.json.encode(entries[1]) -- 缓存最大值为1,解析为单条普通字符串格式
            else
                local t = core.table.new(#entries, 0)
                for i, entry in ipairs(entries) do
                    t[i], err = core.json.encode(entry)
                    if err then
                        core.log.warn("failed to encode http log: ", err, ", log data: ", entry)
                        break
                    end
                end
                data = core.table.concat(t, "\n") -- 缓存最大值大于1,解析为多条普通字符串格式,多条间换行符隔开
            end

        else
            -- defensive programming check
            err = "unknown concat_method " .. (conf.concat_method or "nil")
        end

        --数据解析失败
        if not data then
            return false, 'error occurred while encoding the data: ' .. err
        end
        -- 解析完准备发送
        return send_http_data(conf, data)
    end

    --读取一下我们配置插件信息里面对于批处理器的配置
    local config = {
        name = conf.name,
        retry_delay = conf.retry_delay,
        batch_max_size = conf.batch_max_size,
        max_retry_count = conf.max_retry_count,
        buffer_duration = conf.buffer_duration,
        inactive_timeout = conf.inactive_timeout,
        route_id = ctx.var.route_id,
        server_addr = ctx.var.server_addr,
    }

    -- 让batch_processor去执行log的发送
    local err
    log_buffer, err = batch_processor:new(func, config)

    -- 发送失败
    if not log_buffer then
        core.log.error("error when creating the batch processor: ", err)
        return
    end

    buffers[conf] = log_buffer
    log_buffer:push(entry)
end


return _M

0

评论区