标签搜索

目 录CONTENT

文章目录

Hologres查询时长预测

陈铭
2023-12-29 / 0 评论 / 0 点赞 / 64 阅读 / 4,293 字 / 正在检测是否收录...

背景

  • hologres业务高峰期,大量来自CEM的任务容易造成OOM
  • CEM无法识别sql复杂程度,执行时长不可控
  • CEM没有sql调度中心,无法对sql查询任务进行动态管理

样本集

原始来源

来着于hologres的历史查询记录可以获得可用样本

  • 只获取查询成功的SELECT语句
  • 只获取开头为WITH的复杂查询,多为分析sql的临时表sql结构
  • 只获取来自测试环境和生产环境的sql
SELECT
    COUNT(1) as count
FROM hologres.hg_query_log
WHERE
  command_tag='SELECT'
  AND status = 'SUCCESS'
  AND datname='ry_cdm_hl'
  AND client_addr in ('xx.xx.xx.xx','xx.xx.xx.xx')
  AND query like 'WITH%'

特征工程

由历史记录可以获得具体执行的sql以及执行时长,这本身就是样本对(sql–>duration)。由于执行时长(duration)是个数值,样本构造仅需要对sql语句进行处理

Explain

explain <sql> 获取执行计划,解析执行计划顺序构造成二叉树,二叉树节点为执行计划节点的扫描函数。
执行计划是从下到上进行计算sql的,这里是hologres单节点执行sql的情况,构建出的二叉树是个极不平衡树(只会有左节点)。有些复杂sql可以构建出左右子树存在的平衡二叉树

Sort  (cost=0.00..31806.28 rows=926859 width=30)
  Sort Key: (CASE WHEN ("_$title" IS NULL) THEN 'NULL'::text ELSE "_$title" END), (FINAL pg_catalog.count((PARTIAL pg_catalog.count(1)))) DESC
  ->  Gather  (cost=0.00..555.17 rows=926859 width=30) 
        ->  Final HashAggregate  (cost=0.00..487.63 rows=926859 width=30)
              Group Key: (CASE WHEN ("_$title" IS NULL) THEN 'NULL'::text ELSE "_$title" END), "_$event"
              ->  Redistribution  (cost=0.00..379.01 rows=5088503 width=30) 
                    Hash Key: (CASE WHEN ("_$title" IS NULL) THEN 'NULL'::text ELSE "_$title" END), "_$event"
                    ->  Local Gather  (cost=0.00..355.12 rows=5088503 width=30)
                          ->  Decode  (cost=0.00..354.60 rows=5088503 width=30)
                                ->  Partial HashAggregate  (cost=0.00..354.50 rows=5088503 width=30)
                                      Group Key: (CASE WHEN ("_$title" IS NULL) THEN 'NULL'::text ELSE "_$title" END), "_$event"
                                      ->  Filter  (cost=0.00..205.43 rows=27936128 width=22)
                                            Filter: ((CASE WHEN ("_$title" IS NULL) THEN 'NULL'::text ELSE "_$title" END) <> ''::text)
                                            ->  Project  (cost=0.00..159.47 rows=27936128 width=22)
                                                  ->  Seq Scan on Partitioned Table dwd_log_cem_event_enet  (cost=0.00..127.74 rows=35277948 width=27)
                                                        Partitions selected: 7 out of 161
                                                        Filter: ((ds >= 20231115) AND (ds <= 20231121))
                                ->  Partial HashAggregate  (cost=0.00..354.50 rows=5088503 width=30)
                                      Group Key: (CASE WHEN ("_$title" IS NULL) THEN 'NULL'::text ELSE "_$title" END), "_$event"
                                      ->  Filter  (cost=0.00..205.43 rows=27936128 width=22)
                                            Filter: ((CASE WHEN ("_$title" IS NULL) THEN 'NULL'::text ELSE "_$title" END) <> ''::text)
                                            ->  Project  (cost=0.00..159.47 rows=27936128 width=22)
                                                  ->  Seq Scan on Partitioned Table dwd_log_cem_event_enet  (cost=0.00..127.74 rows=35277948 width=27)
                                                        Partitions selected: 7 out of 161
                                                        Filter: ((ds >= 20231115) AND (ds <= 20231121))
Optimizer: HQO version 2.0.0    
Optimizer: HQO version 2.0.0

构造Tensor

一颗二叉树代表了sql的执行计划,也间接代表了sql的复杂程度(由执行计划的顺序和扫描行数共同决定)
对于二叉树–>Tensor,这边主要是由这棵树的前中后序遍历结果进行构造,因为前中后序遍历决定当且仅当的一棵树

普通特征矩阵

这边构造了两种Tensor,一种是普通特征矩阵[batch_size,500,3]

  • 维度1(500):前中后序遍历的最大长度,目前所有sql的前中后序遍历加起来长度也就00出头,500的容量完全够用(遍历长度不足500,补0.0)
  • 维度2(3):二叉树节点保存的rows、width以及算子名称。目前样本中sql执行计划出现过的算子最多20多个,直接映射成0-20+的数值即可(编码)
import re
from queue import LifoQueue

from sql_time_pred.holo.entity.history import History
from sql_time_pred.holo.entity.operator_node import OperatorNode


def refine_plan(plan):
    return [p for p in plan if "cost=" in p and "rows=" in p and "width=" in p]


def head_space_count(text):
    pattern = r'^\s+'
    # 使用正则表达式找到匹配项
    match = re.search(pattern, text)
    # 如果找到匹配项,则计算空格的数量
    if match:
        count_spaces = len(match.group())
    else:
        count_spaces = 0
    return count_spaces


def parse_plan(duration: int, plan: list[str]):
    # 去除无用信息
    plan = refine_plan(plan)
    root_line = plan[0]
    root = OperatorNode(root_line)
    root_space_count = head_space_count(root_line)
    root.level = root_space_count
    root.duration = duration

    stack = LifoQueue()
    indent_stack = LifoQueue()
    stack.put(root)
    indent_stack.put(root_space_count)

    for i in range(1, len(plan)):
        line = plan[i]
        space_count = head_space_count(line)
        node = OperatorNode(line)
        node.level = space_count
        node.duration = duration

        if space_count > indent_stack.queue[-1]:
            stack.queue[-1].left_node = node
            stack.put(node)
            indent_stack.put(space_count)
        else:
            while space_count <= indent_stack.queue[-1]:
                stack.get()
                indent_stack.get()
            stack.queue[-1].right_node = node
            stack.put(node)
            indent_stack.put(space_count)

    return root


def build_nodes(histories: list[History]) -> list[OperatorNode]:
    plans = [(his.duration, his.plan) for his in histories]
    return [parse_plan(duration, plan) for duration, plan in plans]


def validate_parse_nodes(histories: list[History]):
    nodes = build_nodes(histories)
    assert len(nodes) == len(histories)
    for i in range(len(histories)):
        plan0 = []
        print_node(nodes[i], plan0)
        plan = histories[i].plan
        plan = refine_plan(plan)
        print(plan == plan0)


def print_node(root: OperatorNode, plan: list[str]):
    if root == None:
        return
    plan.append(root.message)
    print_node(root.left_node, plan)
    print_node(root.right_node, plan)
Image(主要用于后续的CNN方案)

前中后序遍历也仅仅是用节点顺序表达一棵树,是否可以构造出更为直观保留树结构和节点信息的Tensor?
对一棵树的遍历映射成二维特征矩阵,前中后序遍历可以获得一个Image[batch_size,3,500,500]

  • 维度1(3):前中后序遍历
  • 维度2、3(500):容纳树的遍历最多用500*500的Tensor保存,同上,也是够用的
100, 0, 0,   0, 0,   0
200, 0, 200, 0, 0,   0
300, 0, 300, 0, 300, 0
文本编码

除去计算机语言特性,sql语句本身就是文本,完全可以用NLP的方式进行编码

import random
import re

import torch
from nltk.tokenize import word_tokenize

# 示例文本
text = '''
with temp_event_table as(
select
   case
      when cast(event_cem._$title as text) is null then 'NULL'
      else cast(event_cem._$title as text)
   end as group_cem,
   event_cem.ds as ds_cem,
   event_cem._$event as _$event_cem,
   event_cem._$time as _$time_cem,
   event_cem._$one_id as _$one_id_cem
from
   ry_cdm.test_dwd_log_cem_event_enet event_cem
where
   (event_cem.ds >= 20231111
      and event_cem.ds <= 20231124)
   and ((event_cem._$event = '$AppStart')
      or (event_cem._$event = '$AppEnd')) ),
temp_bool_table as(
select
   temp_event_table.group_cem,
   temp_event_table._$one_id_cem,
   DATE(TO_TIMESTAMP(temp_event_table._$time_cem / 1000)) as date_cem,
   case
      when temp_event_table._$time_cem >= 1699632000000
         and temp_event_table._$time_cem <= 1700236800000
         and (temp_event_table._$event_cem = '$AppStart') then true
         else false
      end as start_cem,
      case
         when temp_event_table._$time_cem >= 1699718400000
            and temp_event_table._$time_cem <= 1700841600000
            and (temp_event_table._$event_cem = '$AppEnd') then true
            else false
         end as end_cem
      from
         temp_event_table ),
temp_retention_table as(
select
   temp_bool_table.group_cem,
   temp_bool_table._$one_id_cem,
   RANGE_RETENTION_COUNT(temp_bool_table.start_cem,
   temp_bool_table.end_cem,
   temp_bool_table.date_cem,
   array[1,
   2,
   3,
   4,
   5,
   6,
   7],
   'day',
   'normal') as retained_detail_cem
from
   temp_bool_table
group by
   temp_bool_table.group_cem,
   temp_bool_table._$one_id_cem ),
temp_merge_table as(
select
   temp_retention_table.group_cem,
   temp_retention_table._$one_id_cem,
   REGEXP_SPLIT_TO_ARRAY(unnest(RANGE_RETENTION_SUM(temp_retention_table.retained_detail_cem)), ',')as retained_cem
from
   temp_retention_table
group by
   temp_retention_table.group_cem,
   temp_retention_table._$one_id_cem ),
temp_split_table as(
select
   temp_merge_table.group_cem,
   temp_merge_table._$one_id_cem,
   temp_merge_table.retained_cem[1] as first_day_cem,
   temp_merge_table.retained_cem[2] as init_people_cem,
   temp_merge_table.retained_cem[3] as retention_1_cem,
   temp_merge_table.retained_cem[4] as retention_2_cem,
   temp_merge_table.retained_cem[5] as retention_3_cem,
   temp_merge_table.retained_cem[6] as retention_4_cem,
   temp_merge_table.retained_cem[7] as retention_5_cem,
   temp_merge_table.retained_cem[8] as retention_6_cem,
   temp_merge_table.retained_cem[9] as retention_7_cem,
   temp_merge_table.retained_cem[10] as retention_8_cem,
   temp_merge_table.retained_cem[11] as retention_9_cem,
   temp_merge_table.retained_cem[12] as retention_10_cem,
   temp_merge_table.retained_cem[13] as retention_11_cem,
   temp_merge_table.retained_cem[14] as retention_12_cem,
   temp_merge_table.retained_cem[15] as retention_13_cem,
   temp_merge_table.retained_cem[16] as retention_14_cem,
   temp_merge_table.retained_cem[17] as retention_15_cem
from
   temp_merge_table )
select
   distinct temp_split_table._$one_id_cem as _$one_id,
   account_cem._$login_id,
   account_cem.account_id,
   account_cem.account_tenant_id,
   account_cem.account_company_id,
   account_cem.account_name,
   account_cem.account_phone_decrypt,
   account_cem.is_real_name_account,
   account_cem.account_identy_type,
   account_cem.account_l1_zone,
   account_cem.account_l2_zone,
   account_cem.account_nation,
   account_cem.account_provnce,
   account_cem.account_city,
   account_cem.account_first_visit_app_time,
   account_cem.account_first_visit_macc_time,
   account_cem.account_last_visit_app_time,
   account_cem.account_last_visit_macc_time,
   account_cem.use_duratn,
   account_cem.use_cnt,
   account_cem.account_type,
   account_cem.account_registr_date,
   account_cem.account_active_date,
   account_cem.account_repurce_date,
   account_cem.account_loyal_date,
   account_cem.account_valid_proj_cnt,
   account_cem.account_rui_proj_cnt,
   account_cem.account_yi_proj_cnt,
   account_cem.account_home_proj_cnt,
   account_cem.account_device_cnt,
   account_cem.account_rui_device_cnt,
   account_cem.account_yi_device_cnt,
   account_cem.account_home_device_cnt,
   account_cem.account_proj_sum_price,
   account_cem.account_ry_proj_sum_price,
   account_cem.account_first_rui_proj_online_date,
   account_cem.account_first_yi_proj_online_date,
   account_cem.account_certify_enginer_type,
   account_cem.account_company_name,
   account_cem.account_company_role_type,
   account_cem.company_owner_account_name,
   account_cem.company_class,
   account_cem.company_employe_cnt,
   account_cem.company_certify_enginer_cnt,
   account_cem.company_proj_cnt,
   account_cem.company_scheme_cnt,
   account_cem.company_device_cnt,
   account_cem.case_total_cnt,
   account_cem.case_cnt_rct_1m,
   account_cem.case_cnt_rct_6m,
   account_cem.case_cnt_rct_1y,
   account_cem._$carrier,
   account_cem._$manufacturer,
   account_cem._$model,
   account_cem._$browser,
   account_cem._$browser_version,
   account_cem._$os,
   account_cem._$os_version,
   account_cem._$screen_height,
   account_cem._$screen_width
from
   ry_ads.ads_log_cem_user_account_label_wt_enet account_cem
right join temp_split_table on
   temp_split_table._$one_id_cem = account_cem._$one_id
where
   temp_split_table.init_people_cem::INTEGER >= 1
   and temp_split_table.group_cem = ''
   and temp_split_table.first_day_cem = '20231111'
limit 15 offset 0
'''
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

text = re.sub(r'\s+', ' ', text)
# 分词
tokens = word_tokenize(text)

# 构建词汇表
vocab = sorted(list(set(tokens)))

# 随机打乱词汇表顺序
seed = 213422
random.seed(seed)
random.shuffle(vocab)

word_to_index = {word: i for i, word in enumerate(vocab)}

# 将文本转换为索引序列
indexed_tokens = [word_to_index[word] for word in tokens]

# 将索引序列转换为张量
tensor = torch.tensor(indexed_tokens)
tensor = tensor.to(device)

# 打印结果
print(tensor)

但这种编码方案十分依赖足量样本,以保证样本的泛化足够
举例:
这两句sql文本上就差了个数字2,文本编码出来的Tensor十分类似,但是两者执行复杂度天差地别。样本很少时,使用文本编码,会造成严重的样本倾斜

-- 简单sql,查询7天数据
SELECT * FROM ry_cdm.dwd_log_cem_event_enet
WHERE ds >= 20231115 and event_cem.ds <= 20231121
-- 复杂sql,查询一年数据
SELECT * FROM ry_cdm.dwd_log_cem_event_enet
WHERE ds >= 20221115 and event_cem.ds <= 20231121

样本倾斜

目前样本中的执行时长大部分集中在1s左右,这就造成大量样本对的输出基本就是1秒多,训练出来的模型由于样本倾斜,也只会输出1s左右的预测。
用log函数对其进行离散化

self.duration = [math.log(x + math.e, math.e) for x in self.duration]

离散化前的执行时长分布
image-1703842811747
离散化后的执行时长分布
image-1703842815879

标准化

主要针对sql转换成的Tensor进行标准化,执行计划中行数大的可达几百亿,小的只有个位数,标准化可以避免大数值影响梯度下降

def norm(self, tensor):
    tensor = torch.log(tensor + 1)  # 添加1以避免log(0)
    # 标准化(零均值,单位方差)
    mean = tensor.mean()
    std = tensor.std(dim=0) + 0.0001
    tensor = (tensor - mean) / std
    return mean, std, tensor

模型

特征矩阵欧式距离

原理

对现有样本的sql都构造成Image,即三维特征矩阵。任意的sql都可以构造Image,和样本的Image进行相似度比对。矩阵相似度计算方式有很多,这里用最小的欧氏距离获取样本中最相似的Image,进而获得对应样本的执行时长。

优点

  • 模型不需要训练,不需要额外的GPU资源
  • 易于实现,执行效率高
    ##¥ 缺点
  • 精度差,受限于样本过少,即使匹配到最相似的样本,其欧氏距离仍旧很大,获得的执行时长并不准

CNN

原理

对于Image样本进行CNN分类任务。全连接层直接预测时长
image-1703843184668

优点

  • 实现简单,CNN会帮你处理好数据中的结构信息

缺点

  • Image中存在大量0数据,数据过于稀疏,梯度下降困难
  • 样本过少,预测时长不够准确

LSTM+ResNet3/5

这里的LSTM用了8层,输出的维度由原来3扩充为了32

import torch
from torch import nn
from torch.nn import functional

from sql_time_pred.dataset.sample_builder import max_seq_length, vector_size

use_res_net = True


class ResBlock(nn.Module):
    def __init__(self, ic, oc, class_num):
        super(ResBlock, self).__init__()

        self.ic = ic
        self.oc = oc
        self.class_num = class_num

        self.linear1 = nn.Linear(ic, oc)
        self.bn1 = nn.BatchNorm1d(class_num)

        self.linear2 = nn.Linear(oc, oc)
        self.bn2 = nn.BatchNorm1d(class_num)

        # 短接回路
        self.extra = nn.Sequential()
        if oc != ic:
            self.extra = nn.Sequential(
                nn.Linear(ic, oc),
                nn.BatchNorm1d(class_num)
            )

    def forward(self, x):
        # 计算残差块的卷积输出
        out = functional.relu(self.bn1(self.linear1(x)))
        out = self.bn2(self.linear2(out))

        # 计算短接的输出并相加
        out = self.extra(x) + out
        out = functional.relu(out)
        return out


class LSTM(nn.Module):

    def __init__(self, class_num: int,
                 hidden_size: int,
                 num_layers: int,
                 res_layer_num: int):
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.class_num = class_num
        # [batchSize,max_seq_length,vectorSize] --> [batchSize,max_seq_length,hidden_size]
        self.lstm = nn.LSTM(
            input_size=vector_size,
            hidden_size=self.hidden_size,  # hidden_layer的数目,即输出的维度
            num_layers=num_layers,
            batch_first=True,  # 输入数据的维度一般是(batch, squence, vector),该属性表征batch是否放在第一个维度
        )
        self.seq = nn.Sequential(
            nn.Linear(int(max_seq_length * self.hidden_size / self.class_num), 1024), nn.ReLU(),
            nn.Linear(1024, 512), nn.ReLU(),
            nn.Linear(512, 128), nn.ReLU(),
            nn.Linear(128, 1), nn.ReLU(), )

        assert res_layer_num == 3 or res_layer_num == 5
        channels = [512, 256, 128, 64, 32][::int(3 - (res_layer_num - 1) / 2)]
        channels = [int(max_seq_length * self.hidden_size / self.class_num)] + channels
        self.res_seq = nn.Sequential()
        for i in range(len(channels) - 1):
            self.res_seq.add_module("res_net_" + str(i), ResBlock(channels[i], channels[i + 1], class_num))
            self.res_seq.add_module("relu_" + str(i), nn.ReLU(), )
        self.fc = nn.Sequential(
            nn.Linear(32, 16), nn.ReLU(),
            nn.Linear(16, 1), nn.ReLU(),
        )

    def forward(self, x):
        output, h_c = self.lstm(x)
        output = torch.reshape(output, (-1, self.class_num, int(output.shape[1] * output.shape[2] / self.class_num)))

        if use_res_net:
            output = self.res_seq(output)
        else:
            output = self.seq(output)
        output = self.fc(output)
        # [1,20,1]
        output = torch.reshape(output, (-1, self.class_num))
        # [1,20]
        return output


if __name__ == '__main__':
    l = [512, 256, 128, 64, 32][:1]
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    randn = torch.randn(1, 500, 3).to(device)
    net = LSTM(20).to(device)
    net1 = net(randn)
    print()

优点

  • sql执行计划可以视作时序信息,本质上代表了hologres的运行顺序,由LSTM提取特征比较合适
  • 精度最高,当然模型参数量也最多
  • 全连接层由多层线性层转为了多个ResNet残差块
  • 由原先的预测执行时长转为分类任务,在分类数比较多的时候,同样可以预测细粒度的时长
  • 多中分类数量可以训练多个模型,做集成学习

缺点

  • 系统复杂度提升,模型代码量较多
  • 提升模型维护门槛

模型测试

50分类

以训练5000轮的结果为例,训练参数如下

训练参数

[
    {
        "cls_weight":false,
        "res_layer_num":5,
        "hidden_size":32,
        "num_layers":5,
        "batch_size": 256,
        "epochs": 5000,
        "class_num": 50,
        "learn_ratio": 0.001
    }
]

训练基准

image-1703843297474
image-1703843308137

测试基准

在0-10s内准确度在80%以上
image-1703843316362
image-1703843324432

  • R:真实的执行时长
  • M:模型预测时长

样本离散化失效了?
并不是,交叉熵损失设置权重,仅仅是帮助训练时梯度下降的更快,尽量让模型学习稀疏部分的执行时长。
离散化也是在帮助梯度下降,样本倾斜的事实并没有改变,只是把集中在1-10s的样本离散在更大的数学空间上了。
样本质量决定了模型的上限,模型最好的性能也只能体现在了1-10s区间的预测了。
image-1703843338051

80分类

以训练1000轮的结果为例,训练参数如下

训练参数

[
    {
        "cls_weight":false,
        "res_layer_num":5,
        "hidden_size":32,
        "num_layers":5,
        "batch_size": 256,
        "epochs": 1000,
        "class_num": 80,
        "learn_ratio": 0.001
    }
]

训练基准

image-1703843359716
image-1703843364705

测试基准

在0-10s内准确度在70%以上
image-1703843370114
image-1703843382898

  • R:真实的执行时长
  • M:模型预测时长

集成学习

集成40、50、62、80分类的四个模型进行基准测试
image-1703843401772
image-1703843415104

改进了CEM什么?

队列系统,sql好好排队,合理打向数仓
20231229-175719

现存问题

  • 样本不足
  • holo即将升配
0

评论区