目 录CONTENT

文章目录

TensorRT安装部署

chenming
2024-12-16 / 0 评论 / 0 点赞 / 17 阅读 / 0 字 / 正在检测是否收录...

安装trt

官方文档:https://docs.nvidia.com/deeplearning/tensorrt/install-guide/index.html#installing-pip

官方样例:https://github.com/NVIDIA/TensorRT/tree/main/samples

Linux端

pip install tensorrt-cu11-bindings==10.0.1 tensorrt-cu11-libs==10.0.1 tensorrt_cu11==10.0.1 tensorrt-dispatch-cu11-bindings==10.0.1 tensorrt-dispatch-cu11-libs==10.0.1 tensorrt_dispatch_cu11==10.0.1 tensorrt-lean-cu11-bindings==10.0.1 tensorrt-lean-cu11-libs==10.0.1 tensorrt_lean_cu11==10.0.1

线上环境使用conda隔离除了一个CUDA11.8的环境,所以TRT安装cu11版本的,目前最新版本为10.7.0,但是这个版本在编译模型时会报错

ERROR: [Torch-TensorRT TorchScript Conversion Context] - IBuilder::buildSerializedNetwork: Error Code 9: API Usage Error (Target GPU SM 70 is not supported by this TensorRT release.)

结合github上这个issue,索性降到10大版本的最低版本10.0.1

Win端

windows本地是cuda12.6,直接安装会安装tensorrt_cu12等类似上述cu_12结尾的一堆包

pip install tensorrt

其他要用到的依赖

# 使用py语言调用cuda语言,为张量分配内存用
cuda-python==12.2.0; python_version <= "3.10"
cuda-python==12.6.0; python_version >= "3.11"
# win端需要额外安装这个
pywin32; platform_system == "Windows"

验证安装

import tensorrt
print(tensorrt.__version__)
assert tensorrt.Builder(tensorrt.Logger())

import tensorrt_lean as trt
print(trt.__version__)
assert trt.Runtime(trt.Logger())

import tensorrt_dispatch as trt
print(trt.__version__)
assert trt.Runtime(trt.Logger())

成功会打印出三个版本号

测试BOSS新老用户模型

具体不展开讲,都是业务相关的。
整体流程:

  • onnx模型转engine,此时trt会做大量优化编译模型(算子合并等等)

  • engine开启上下文context

  • 使用cuda-python,为输入输出的numpy对象分配cuda内存,后续context基于输入输出的内存指针读取输入数据并回写输出数据

  • 使用context,以及一堆输入输出推理

  • 释放内存

import sys
sys.path.append('../../..')

from tqdm import tqdm

from infer.model_load.tensorrt.common_runtime import free_buffers

import onnx
from onnx import shape_inference

from data.source import Source

import numpy as np

import tensorrt as trt
from tensorrt_bindings import IBuilderConfig

from infer.model_load.script_model_loader import get_rk_dssm_train_loader
from infer.model_load.tensorrt import common

TRT_LOGGER = trt.Logger(trt.Logger.VERBOSE)


# The Onnx path is used for Onnx models.
def build_engine_onnx(onnx_model_path: str, save_serialized_engine=False):
    #  Importing a Model Using the ONNX Parser
    builder = trt.Builder(TRT_LOGGER)
    network = builder.create_network(0)
    # The next step is to create a build configuration
    # specifying how TensorRT should optimize the model
    config: IBuilderConfig = builder.create_builder_config()
    parser = trt.OnnxParser(network, TRT_LOGGER)

    config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, common.GiB(1))

    # Define an optimization profile
    profile = builder.create_optimization_profile()

    # Infer shapes dynamically
    onnx_model = onnx.load(onnx_model_path)
    inferred_model = shape_inference.infer_shapes(onnx_model)
    # Get input names and their shapes
    input_names = [
        'h5_ip', 'h5_screen', 'h5_os',
        'h5_model', 'h5_browser', 'h5_ua',
        'app_ip', 'app_screen', 'app_os',
        'app_model', 'app_browser', 'app_ua'
    ]
    # 先写死,只支持bs为1
    min_batch_size = 1
    opt_batch_size = 1
    max_batch_size = 1
    for input_name in input_names:
        if input_name.endswith('_ip'):
            feature_dim = 4
        elif input_name.endswith('_screen'):
            feature_dim = 3
        else:
            feature_dim = 1024
        profile.set_shape(input_name,
                          min=(min_batch_size, feature_dim),
                          opt=(opt_batch_size, feature_dim),
                          max=(max_batch_size, feature_dim))

    # Add the profile to the configuration
    config.add_optimization_profile(profile)

    # Load the Onnx model and parse it in order to populate the TensorRT network.
    with open(onnx_model_path, "rb") as model:
        if not parser.parse(model.read()):
            print("ERROR: Failed to parse the ONNX file.")
            for error in range(parser.num_errors):
                print(parser.get_error(error))
            return None
    # After the configuration has been specified,
    # the engine can be built and serialized with:
    serialized_engine = builder.build_serialized_network(network, config)
    if serialized_engine is None:
        raise RuntimeError("ERROR: Failed to build serialized engine.")
    # It may be useful to save the engine to a file for future use.
    if save_serialized_engine:
        with open(f'{onnx_model_path.split(".")[0]}.engine', 'wb') as f:
            f.write(serialized_engine)
    runtime = trt.Runtime(TRT_LOGGER)
    engine = runtime.deserialize_cuda_engine(serialized_engine)
    if engine is None:
        raise RuntimeError("ERROR: Failed to build engine.")
    return engine


def tensorrt_rk_dssm_infer(source: Source):
    from infer.model_load.onnx_model_loader import get_onnx_model_path
    onnx_model_path = get_onnx_model_path(source=source)
    # Build a TensorRT engine.
    engine = build_engine_onnx(onnx_model_path)
    # Inference is the same regardless of which parser is used to build the engine, since the model architecture is the same.
    # Allocate buffers and create a CUDA stream.
    tensor_names, inputs, outputs, bindings, stream = common.allocate_buffers(engine, profile_idx=0)
    # Contexts are used to perform inference.
    context = engine.create_execution_context()

    # Load a normalized test case into the host input page-locked buffer.
    # test_image = random.choice(test_images)
    # test_case = load_normalized_test_case(test_image, inputs[0].host)
    # Run the engine. The output will be a 1D tensor of length 1000, where each value represents the
    # probability that the image corresponds to that label

    train_loader = get_rk_dssm_train_loader(source, batch_size=1)

    for h5_data, app_data, _, _, _ in tqdm(train_loader, desc='tensorrt'):
        fill_trt_input(h5_data, app_data, tensor_names, inputs)
        trt_outputs = common.do_inference(
            context,
            engine=engine,
            bindings=bindings,
            inputs=inputs,
            outputs=outputs,
            stream=stream,
        )[0]
    free_buffers(inputs, outputs, stream)
    # for h5_data, app_data, _, _, _ in tqdm(train_loader, desc=f'model {dssm_model.__class__}'):
    #     dssm_model(h5_data, app_data)


def fill_trt_input(h5_data, app_data, tensor_names, inputs):
    for name, inp in zip(tensor_names, inputs):
        if name.startswith('h5_'):
            name = name.replace('h5_', '')
            data = h5_data
        else:
            name = name.replace('app_', '')
            data = app_data
        np.copyto(inp.host, data[name].squeeze(0).numpy())


if __name__ == '__main__':
    source = Source.BOSS_NEW_OLD
    tensorrt_rk_dssm_infer(source=source)
    # train_loader = list(get_rk_dssm_train_loader(source))
    # input = random.sample(train_loader, 1)[0]
    # h5_data, app_data = input[0], input[1]
    # def print_size(data):
    #     print('ip', data['ip'].shape)
    #     print('screen', data['screen'].shape)
    #     print('os', data['os'].shape)
    #     print('model', data['model'].shape)
    #     print('browser', data['browser'].shape)
    #     print('ua', data['ua'].shape)
    # print_size(h5_data)
    # print_size(app_data)
#
# SPDX-FileCopyrightText: Copyright (c) 1993-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import argparse
import os

import tensorrt as trt
from infer.model_load.tensorrt.common_runtime import *

try:
    # Sometimes python does not understand FileNotFoundError
    FileNotFoundError
except NameError:
    FileNotFoundError = IOError


def GiB(val):
    return val * 1 << 30


def add_help(description):
    parser = argparse.ArgumentParser(
        description=description, formatter_class=argparse.ArgumentDefaultsHelpFormatter
    )
    args, _ = parser.parse_known_args()


def find_sample_data(
    description="Runs a TensorRT Python sample", subfolder="", find_files=[], err_msg=""
):
    """
    Parses sample arguments.

    Args:
        description (str): Description of the sample.
        subfolder (str): The subfolder containing data relevant to this sample
        find_files (str): A list of filenames to find. Each filename will be replaced with an absolute path.

    Returns:
        str: Path of data directory.
    """

    # Standard command-line arguments for all samples.
    kDEFAULT_DATA_ROOT = os.path.join(os.sep, "usr", "src", "tensorrt", "data")
    parser = argparse.ArgumentParser(
        description=description, formatter_class=argparse.ArgumentDefaultsHelpFormatter
    )
    parser.add_argument(
        "-d",
        "--datadir",
        help="Location of the TensorRT sample data directory, and any additional data directories.",
        action="append",
        default=[kDEFAULT_DATA_ROOT],
    )
    args, _ = parser.parse_known_args()

    def get_data_path(data_dir):
        # If the subfolder exists, append it to the path, otherwise use the provided path as-is.
        data_path = os.path.join(data_dir, subfolder)
        if not os.path.exists(data_path):
            if data_dir != kDEFAULT_DATA_ROOT:
                print(
                    "WARNING: "
                    + data_path
                    + " does not exist. Trying "
                    + data_dir
                    + " instead."
                )
            data_path = data_dir
        # Make sure data directory exists.
        if not (os.path.exists(data_path)) and data_dir != kDEFAULT_DATA_ROOT:
            print(
                "WARNING: {:} does not exist. Please provide the correct data path with the -d option.".format(
                    data_path
                )
            )
        return data_path

    data_paths = [get_data_path(data_dir) for data_dir in args.datadir]
    return data_paths, locate_files(data_paths, find_files, err_msg)


def locate_files(data_paths, filenames, err_msg=""):
    """
    Locates the specified files in the specified data directories.
    If a file exists in multiple data directories, the first directory is used.

    Args:
        data_paths (List[str]): The data directories.
        filename (List[str]): The names of the files to find.

    Returns:
        List[str]: The absolute paths of the files.

    Raises:
        FileNotFoundError if a file could not be located.
    """
    found_files = [None] * len(filenames)
    for data_path in data_paths:
        # Find all requested files.
        for index, (found, filename) in enumerate(zip(found_files, filenames)):
            if not found:
                file_path = os.path.abspath(os.path.join(data_path, filename))
                if os.path.exists(file_path):
                    found_files[index] = file_path

    # Check that all files were found
    for f, filename in zip(found_files, filenames):
        if not f or not os.path.exists(f):
            raise FileNotFoundError(
                "Could not find {:}. Searched in data paths: {:}\n{:}".format(
                    filename, data_paths, err_msg
                )
            )
    return found_files


# Sets up the builder to use the timing cache file, and creates it if it does not already exist
def setup_timing_cache(config: trt.IBuilderConfig, timing_cache_path: os.PathLike):
    buffer = b""
    if os.path.exists(timing_cache_path):
        with open(timing_cache_path, mode="rb") as timing_cache_file:
            buffer = timing_cache_file.read()
    timing_cache: trt.ITimingCache = config.create_timing_cache(buffer)
    config.set_timing_cache(timing_cache, True)


# Saves the config's timing cache to file
def save_timing_cache(config: trt.IBuilderConfig, timing_cache_path: os.PathLike):
    timing_cache: trt.ITimingCache = config.get_timing_cache()
    with open(timing_cache_path, "wb") as timing_cache_file:
        timing_cache_file.write(memoryview(timing_cache.serialize()))
#
# SPDX-FileCopyrightText: Copyright (c) 1993-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import ctypes
from typing import Optional, List, Union

import numpy as np
import tensorrt as trt
from cuda import cuda, cudart

def check_cuda_err(err):
    if isinstance(err, cuda.CUresult):
        if err != cuda.CUresult.CUDA_SUCCESS:
            raise RuntimeError("Cuda Error: {}".format(err))
    if isinstance(err, cudart.cudaError_t):
        if err != cudart.cudaError_t.cudaSuccess:
            raise RuntimeError("Cuda Runtime Error: {}".format(err))
    else:
        raise RuntimeError("Unknown error type: {}".format(err))

def cuda_call(call):
    err, res = call[0], call[1:]
    check_cuda_err(err)
    if len(res) == 1:
        res = res[0]
    return res


class HostDeviceMem:
    """Pair of host and device memory, where the host memory is wrapped in a numpy array"""
    def __init__(self, size: int, dtype: Optional[np.dtype] = None):
        dtype = dtype or np.dtype(np.uint8)
        nbytes = size * dtype.itemsize
        host_mem = cuda_call(cudart.cudaMallocHost(nbytes))
        pointer_type = ctypes.POINTER(np.ctypeslib.as_ctypes_type(dtype))

        self._host = np.ctypeslib.as_array(ctypes.cast(host_mem, pointer_type), (size,))
        self._device = cuda_call(cudart.cudaMalloc(nbytes))
        self._nbytes = nbytes

    @property
    def host(self) -> np.ndarray:
        return self._host

    @host.setter
    def host(self, data: Union[np.ndarray, bytes]):
        if isinstance(data, np.ndarray):
            if data.size > self.host.size:
                raise ValueError(
                    f"Tried to fit an array of size {data.size} into host memory of size {self.host.size}"
                )
            np.copyto(self.host[:data.size], data.flat, casting='safe')
        else:
            assert self.host.dtype == np.uint8
            self.host[:self.nbytes] = np.frombuffer(data, dtype=np.uint8)

    @property
    def device(self) -> int:
        return self._device

    @property
    def nbytes(self) -> int:
        return self._nbytes

    def __str__(self):
        return f"Host:\n{self.host}\nDevice:\n{self.device}\nSize:\n{self.nbytes}\n"

    def __repr__(self):
        return self.__str__()

    def free(self):
        cuda_call(cudart.cudaFree(self.device))
        cuda_call(cudart.cudaFreeHost(self.host.ctypes.data))


# Allocates all buffers required for an engine, i.e. host/device inputs/outputs.
# If engine uses dynamic shapes, specify a profile to find the maximum input & output size.
def allocate_buffers(engine: trt.ICudaEngine, profile_idx: Optional[int] = None):
    inputs = []
    outputs = []
    bindings = []
    stream = cuda_call(cudart.cudaStreamCreate())
    # tensor_names储存了模型输入输出的名字
    # 由最初转onnx时指定的输入输出名决定
    tensor_names = [engine.get_tensor_name(i) for i in range(engine.num_io_tensors)]
    for binding in tensor_names:
        # get_tensor_profile_shape returns (min_shape, optimal_shape, max_shape)
        # Pick out the max shape to allocate enough memory for the binding.
        shape = engine.get_tensor_shape(binding) if profile_idx is None else engine.get_tensor_profile_shape(binding, profile_idx)[0]
        # 所有尺寸都要大于0
        shape_valid = np.all([s >= 0 for s in shape])
        if not shape_valid and profile_idx is None:
            raise ValueError(f"Binding {binding} has dynamic shape, " +\
                "but no profile was specified.")
        size = trt.volume(shape)
        trt_type = engine.get_tensor_dtype(binding)

        # Allocate host and device buffers
        try:
            dtype = np.dtype(trt.nptype(trt_type))
            bindingMemory = HostDeviceMem(size, dtype)
        except TypeError: # no numpy support: create a byte array instead (BF16, FP8, INT4)
            size = int(size * trt_type.itemsize)
            bindingMemory = HostDeviceMem(size)

        # Append the device buffer to device bindings.
        bindings.append(int(bindingMemory.device))

        # Append to the appropriate list.
        if engine.get_tensor_mode(binding) == trt.TensorIOMode.INPUT:
            inputs.append(bindingMemory)
        else:
            outputs.append(bindingMemory)
    return tensor_names,inputs, outputs, bindings, stream


# Frees the resources allocated in allocate_buffers
def free_buffers(inputs: List[HostDeviceMem], outputs: List[HostDeviceMem], stream: cudart.cudaStream_t):
    for mem in inputs + outputs:
        mem.free()
    cuda_call(cudart.cudaStreamDestroy(stream))


# Wrapper for cudaMemcpy which infers copy size and does error checking
def memcpy_host_to_device(device_ptr: int, host_arr: np.ndarray):
    nbytes = host_arr.size * host_arr.itemsize
    cuda_call(cudart.cudaMemcpy(device_ptr, host_arr, nbytes, cudart.cudaMemcpyKind.cudaMemcpyHostToDevice))

# Wrapper for cudaMemcpy which infers copy size and does error checking
def memcpy_device_to_host(host_arr: np.ndarray, device_ptr: int):
    nbytes = host_arr.size * host_arr.itemsize
    cuda_call(cudart.cudaMemcpy(host_arr, device_ptr, nbytes, cudart.cudaMemcpyKind.cudaMemcpyDeviceToHost))


def _do_inference_base(inputs, outputs, stream, execute_async_func):
    # Transfer input data to the GPU.
    kind = cudart.cudaMemcpyKind.cudaMemcpyHostToDevice
    [cuda_call(cudart.cudaMemcpyAsync(inp.device, inp.host, inp.nbytes, kind, stream)) for inp in inputs]
    # Run inference.
    execute_async_func()
    # Transfer predictions back from the GPU.
    kind = cudart.cudaMemcpyKind.cudaMemcpyDeviceToHost
    [cuda_call(cudart.cudaMemcpyAsync(out.host, out.device, out.nbytes, kind, stream)) for out in outputs]
    # Synchronize the stream
    cuda_call(cudart.cudaStreamSynchronize(stream))
    # Return only the host outputs.
    return [out.host for out in outputs]


# This function is generalized for multiple inputs/outputs.
# inputs and outputs are expected to be lists of HostDeviceMem objects.
def do_inference(context, engine, bindings, inputs, outputs, stream):
    def execute_async_func():
        context.execute_async_v3(stream_handle=stream)
    # Setup context tensor address.
    num_io = engine.num_io_tensors
    for i in range(num_io):
        context.set_tensor_address(engine.get_tensor_name(i), bindings[i])
    return _do_inference_base(inputs, outputs, stream, execute_async_func)

性能基准

整体下来,按照上述的DSSM模型为例,bs为1,并发量还是trt最高,且性能极好

  • 原生torch: 141并发

  • torchcompile:182并发

  • torchscript: 222并发

  • onnxruntime-gpu: 218并发

  • tensorrt:490并发

0

评论区