安装trt
官方文档:https://docs.nvidia.com/deeplearning/tensorrt/install-guide/index.html#installing-pip
官方样例:https://github.com/NVIDIA/TensorRT/tree/main/samples
Linux端
pip install tensorrt-cu11-bindings==10.0.1 tensorrt-cu11-libs==10.0.1 tensorrt_cu11==10.0.1 tensorrt-dispatch-cu11-bindings==10.0.1 tensorrt-dispatch-cu11-libs==10.0.1 tensorrt_dispatch_cu11==10.0.1 tensorrt-lean-cu11-bindings==10.0.1 tensorrt-lean-cu11-libs==10.0.1 tensorrt_lean_cu11==10.0.1
线上环境使用conda隔离除了一个CUDA11.8的环境,所以TRT安装cu11版本的,目前最新版本为10.7.0,但是这个版本在编译模型时会报错
ERROR: [Torch-TensorRT TorchScript Conversion Context] - IBuilder::buildSerializedNetwork: Error Code 9: API Usage Error (Target GPU SM 70 is not supported by this TensorRT release.)
结合github上这个issue,索性降到10大版本的最低版本10.0.1
Win端
windows本地是cuda12.6,直接安装会安装tensorrt_cu12等类似上述cu_12结尾的一堆包
pip install tensorrt
其他要用到的依赖
# 使用py语言调用cuda语言,为张量分配内存用
cuda-python==12.2.0; python_version <= "3.10"
cuda-python==12.6.0; python_version >= "3.11"
# win端需要额外安装这个
pywin32; platform_system == "Windows"
验证安装
import tensorrt
print(tensorrt.__version__)
assert tensorrt.Builder(tensorrt.Logger())
import tensorrt_lean as trt
print(trt.__version__)
assert trt.Runtime(trt.Logger())
import tensorrt_dispatch as trt
print(trt.__version__)
assert trt.Runtime(trt.Logger())
成功会打印出三个版本号
测试BOSS新老用户模型
具体不展开讲,都是业务相关的。
整体流程:
onnx模型转engine,此时trt会做大量优化编译模型(算子合并等等)
engine开启上下文context
使用cuda-python,为输入输出的numpy对象分配cuda内存,后续context基于输入输出的内存指针读取输入数据并回写输出数据
使用context,以及一堆输入输出推理
释放内存
import sys
sys.path.append('../../..')
from tqdm import tqdm
from infer.model_load.tensorrt.common_runtime import free_buffers
import onnx
from onnx import shape_inference
from data.source import Source
import numpy as np
import tensorrt as trt
from tensorrt_bindings import IBuilderConfig
from infer.model_load.script_model_loader import get_rk_dssm_train_loader
from infer.model_load.tensorrt import common
TRT_LOGGER = trt.Logger(trt.Logger.VERBOSE)
# The Onnx path is used for Onnx models.
def build_engine_onnx(onnx_model_path: str, save_serialized_engine=False):
# Importing a Model Using the ONNX Parser
builder = trt.Builder(TRT_LOGGER)
network = builder.create_network(0)
# The next step is to create a build configuration
# specifying how TensorRT should optimize the model
config: IBuilderConfig = builder.create_builder_config()
parser = trt.OnnxParser(network, TRT_LOGGER)
config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, common.GiB(1))
# Define an optimization profile
profile = builder.create_optimization_profile()
# Infer shapes dynamically
onnx_model = onnx.load(onnx_model_path)
inferred_model = shape_inference.infer_shapes(onnx_model)
# Get input names and their shapes
input_names = [
'h5_ip', 'h5_screen', 'h5_os',
'h5_model', 'h5_browser', 'h5_ua',
'app_ip', 'app_screen', 'app_os',
'app_model', 'app_browser', 'app_ua'
]
# 先写死,只支持bs为1
min_batch_size = 1
opt_batch_size = 1
max_batch_size = 1
for input_name in input_names:
if input_name.endswith('_ip'):
feature_dim = 4
elif input_name.endswith('_screen'):
feature_dim = 3
else:
feature_dim = 1024
profile.set_shape(input_name,
min=(min_batch_size, feature_dim),
opt=(opt_batch_size, feature_dim),
max=(max_batch_size, feature_dim))
# Add the profile to the configuration
config.add_optimization_profile(profile)
# Load the Onnx model and parse it in order to populate the TensorRT network.
with open(onnx_model_path, "rb") as model:
if not parser.parse(model.read()):
print("ERROR: Failed to parse the ONNX file.")
for error in range(parser.num_errors):
print(parser.get_error(error))
return None
# After the configuration has been specified,
# the engine can be built and serialized with:
serialized_engine = builder.build_serialized_network(network, config)
if serialized_engine is None:
raise RuntimeError("ERROR: Failed to build serialized engine.")
# It may be useful to save the engine to a file for future use.
if save_serialized_engine:
with open(f'{onnx_model_path.split(".")[0]}.engine', 'wb') as f:
f.write(serialized_engine)
runtime = trt.Runtime(TRT_LOGGER)
engine = runtime.deserialize_cuda_engine(serialized_engine)
if engine is None:
raise RuntimeError("ERROR: Failed to build engine.")
return engine
def tensorrt_rk_dssm_infer(source: Source):
from infer.model_load.onnx_model_loader import get_onnx_model_path
onnx_model_path = get_onnx_model_path(source=source)
# Build a TensorRT engine.
engine = build_engine_onnx(onnx_model_path)
# Inference is the same regardless of which parser is used to build the engine, since the model architecture is the same.
# Allocate buffers and create a CUDA stream.
tensor_names, inputs, outputs, bindings, stream = common.allocate_buffers(engine, profile_idx=0)
# Contexts are used to perform inference.
context = engine.create_execution_context()
# Load a normalized test case into the host input page-locked buffer.
# test_image = random.choice(test_images)
# test_case = load_normalized_test_case(test_image, inputs[0].host)
# Run the engine. The output will be a 1D tensor of length 1000, where each value represents the
# probability that the image corresponds to that label
train_loader = get_rk_dssm_train_loader(source, batch_size=1)
for h5_data, app_data, _, _, _ in tqdm(train_loader, desc='tensorrt'):
fill_trt_input(h5_data, app_data, tensor_names, inputs)
trt_outputs = common.do_inference(
context,
engine=engine,
bindings=bindings,
inputs=inputs,
outputs=outputs,
stream=stream,
)[0]
free_buffers(inputs, outputs, stream)
# for h5_data, app_data, _, _, _ in tqdm(train_loader, desc=f'model {dssm_model.__class__}'):
# dssm_model(h5_data, app_data)
def fill_trt_input(h5_data, app_data, tensor_names, inputs):
for name, inp in zip(tensor_names, inputs):
if name.startswith('h5_'):
name = name.replace('h5_', '')
data = h5_data
else:
name = name.replace('app_', '')
data = app_data
np.copyto(inp.host, data[name].squeeze(0).numpy())
if __name__ == '__main__':
source = Source.BOSS_NEW_OLD
tensorrt_rk_dssm_infer(source=source)
# train_loader = list(get_rk_dssm_train_loader(source))
# input = random.sample(train_loader, 1)[0]
# h5_data, app_data = input[0], input[1]
# def print_size(data):
# print('ip', data['ip'].shape)
# print('screen', data['screen'].shape)
# print('os', data['os'].shape)
# print('model', data['model'].shape)
# print('browser', data['browser'].shape)
# print('ua', data['ua'].shape)
# print_size(h5_data)
# print_size(app_data)
#
# SPDX-FileCopyrightText: Copyright (c) 1993-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse
import os
import tensorrt as trt
from infer.model_load.tensorrt.common_runtime import *
try:
# Sometimes python does not understand FileNotFoundError
FileNotFoundError
except NameError:
FileNotFoundError = IOError
def GiB(val):
return val * 1 << 30
def add_help(description):
parser = argparse.ArgumentParser(
description=description, formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
args, _ = parser.parse_known_args()
def find_sample_data(
description="Runs a TensorRT Python sample", subfolder="", find_files=[], err_msg=""
):
"""
Parses sample arguments.
Args:
description (str): Description of the sample.
subfolder (str): The subfolder containing data relevant to this sample
find_files (str): A list of filenames to find. Each filename will be replaced with an absolute path.
Returns:
str: Path of data directory.
"""
# Standard command-line arguments for all samples.
kDEFAULT_DATA_ROOT = os.path.join(os.sep, "usr", "src", "tensorrt", "data")
parser = argparse.ArgumentParser(
description=description, formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"-d",
"--datadir",
help="Location of the TensorRT sample data directory, and any additional data directories.",
action="append",
default=[kDEFAULT_DATA_ROOT],
)
args, _ = parser.parse_known_args()
def get_data_path(data_dir):
# If the subfolder exists, append it to the path, otherwise use the provided path as-is.
data_path = os.path.join(data_dir, subfolder)
if not os.path.exists(data_path):
if data_dir != kDEFAULT_DATA_ROOT:
print(
"WARNING: "
+ data_path
+ " does not exist. Trying "
+ data_dir
+ " instead."
)
data_path = data_dir
# Make sure data directory exists.
if not (os.path.exists(data_path)) and data_dir != kDEFAULT_DATA_ROOT:
print(
"WARNING: {:} does not exist. Please provide the correct data path with the -d option.".format(
data_path
)
)
return data_path
data_paths = [get_data_path(data_dir) for data_dir in args.datadir]
return data_paths, locate_files(data_paths, find_files, err_msg)
def locate_files(data_paths, filenames, err_msg=""):
"""
Locates the specified files in the specified data directories.
If a file exists in multiple data directories, the first directory is used.
Args:
data_paths (List[str]): The data directories.
filename (List[str]): The names of the files to find.
Returns:
List[str]: The absolute paths of the files.
Raises:
FileNotFoundError if a file could not be located.
"""
found_files = [None] * len(filenames)
for data_path in data_paths:
# Find all requested files.
for index, (found, filename) in enumerate(zip(found_files, filenames)):
if not found:
file_path = os.path.abspath(os.path.join(data_path, filename))
if os.path.exists(file_path):
found_files[index] = file_path
# Check that all files were found
for f, filename in zip(found_files, filenames):
if not f or not os.path.exists(f):
raise FileNotFoundError(
"Could not find {:}. Searched in data paths: {:}\n{:}".format(
filename, data_paths, err_msg
)
)
return found_files
# Sets up the builder to use the timing cache file, and creates it if it does not already exist
def setup_timing_cache(config: trt.IBuilderConfig, timing_cache_path: os.PathLike):
buffer = b""
if os.path.exists(timing_cache_path):
with open(timing_cache_path, mode="rb") as timing_cache_file:
buffer = timing_cache_file.read()
timing_cache: trt.ITimingCache = config.create_timing_cache(buffer)
config.set_timing_cache(timing_cache, True)
# Saves the config's timing cache to file
def save_timing_cache(config: trt.IBuilderConfig, timing_cache_path: os.PathLike):
timing_cache: trt.ITimingCache = config.get_timing_cache()
with open(timing_cache_path, "wb") as timing_cache_file:
timing_cache_file.write(memoryview(timing_cache.serialize()))
#
# SPDX-FileCopyrightText: Copyright (c) 1993-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import ctypes
from typing import Optional, List, Union
import numpy as np
import tensorrt as trt
from cuda import cuda, cudart
def check_cuda_err(err):
if isinstance(err, cuda.CUresult):
if err != cuda.CUresult.CUDA_SUCCESS:
raise RuntimeError("Cuda Error: {}".format(err))
if isinstance(err, cudart.cudaError_t):
if err != cudart.cudaError_t.cudaSuccess:
raise RuntimeError("Cuda Runtime Error: {}".format(err))
else:
raise RuntimeError("Unknown error type: {}".format(err))
def cuda_call(call):
err, res = call[0], call[1:]
check_cuda_err(err)
if len(res) == 1:
res = res[0]
return res
class HostDeviceMem:
"""Pair of host and device memory, where the host memory is wrapped in a numpy array"""
def __init__(self, size: int, dtype: Optional[np.dtype] = None):
dtype = dtype or np.dtype(np.uint8)
nbytes = size * dtype.itemsize
host_mem = cuda_call(cudart.cudaMallocHost(nbytes))
pointer_type = ctypes.POINTER(np.ctypeslib.as_ctypes_type(dtype))
self._host = np.ctypeslib.as_array(ctypes.cast(host_mem, pointer_type), (size,))
self._device = cuda_call(cudart.cudaMalloc(nbytes))
self._nbytes = nbytes
@property
def host(self) -> np.ndarray:
return self._host
@host.setter
def host(self, data: Union[np.ndarray, bytes]):
if isinstance(data, np.ndarray):
if data.size > self.host.size:
raise ValueError(
f"Tried to fit an array of size {data.size} into host memory of size {self.host.size}"
)
np.copyto(self.host[:data.size], data.flat, casting='safe')
else:
assert self.host.dtype == np.uint8
self.host[:self.nbytes] = np.frombuffer(data, dtype=np.uint8)
@property
def device(self) -> int:
return self._device
@property
def nbytes(self) -> int:
return self._nbytes
def __str__(self):
return f"Host:\n{self.host}\nDevice:\n{self.device}\nSize:\n{self.nbytes}\n"
def __repr__(self):
return self.__str__()
def free(self):
cuda_call(cudart.cudaFree(self.device))
cuda_call(cudart.cudaFreeHost(self.host.ctypes.data))
# Allocates all buffers required for an engine, i.e. host/device inputs/outputs.
# If engine uses dynamic shapes, specify a profile to find the maximum input & output size.
def allocate_buffers(engine: trt.ICudaEngine, profile_idx: Optional[int] = None):
inputs = []
outputs = []
bindings = []
stream = cuda_call(cudart.cudaStreamCreate())
# tensor_names储存了模型输入输出的名字
# 由最初转onnx时指定的输入输出名决定
tensor_names = [engine.get_tensor_name(i) for i in range(engine.num_io_tensors)]
for binding in tensor_names:
# get_tensor_profile_shape returns (min_shape, optimal_shape, max_shape)
# Pick out the max shape to allocate enough memory for the binding.
shape = engine.get_tensor_shape(binding) if profile_idx is None else engine.get_tensor_profile_shape(binding, profile_idx)[0]
# 所有尺寸都要大于0
shape_valid = np.all([s >= 0 for s in shape])
if not shape_valid and profile_idx is None:
raise ValueError(f"Binding {binding} has dynamic shape, " +\
"but no profile was specified.")
size = trt.volume(shape)
trt_type = engine.get_tensor_dtype(binding)
# Allocate host and device buffers
try:
dtype = np.dtype(trt.nptype(trt_type))
bindingMemory = HostDeviceMem(size, dtype)
except TypeError: # no numpy support: create a byte array instead (BF16, FP8, INT4)
size = int(size * trt_type.itemsize)
bindingMemory = HostDeviceMem(size)
# Append the device buffer to device bindings.
bindings.append(int(bindingMemory.device))
# Append to the appropriate list.
if engine.get_tensor_mode(binding) == trt.TensorIOMode.INPUT:
inputs.append(bindingMemory)
else:
outputs.append(bindingMemory)
return tensor_names,inputs, outputs, bindings, stream
# Frees the resources allocated in allocate_buffers
def free_buffers(inputs: List[HostDeviceMem], outputs: List[HostDeviceMem], stream: cudart.cudaStream_t):
for mem in inputs + outputs:
mem.free()
cuda_call(cudart.cudaStreamDestroy(stream))
# Wrapper for cudaMemcpy which infers copy size and does error checking
def memcpy_host_to_device(device_ptr: int, host_arr: np.ndarray):
nbytes = host_arr.size * host_arr.itemsize
cuda_call(cudart.cudaMemcpy(device_ptr, host_arr, nbytes, cudart.cudaMemcpyKind.cudaMemcpyHostToDevice))
# Wrapper for cudaMemcpy which infers copy size and does error checking
def memcpy_device_to_host(host_arr: np.ndarray, device_ptr: int):
nbytes = host_arr.size * host_arr.itemsize
cuda_call(cudart.cudaMemcpy(host_arr, device_ptr, nbytes, cudart.cudaMemcpyKind.cudaMemcpyDeviceToHost))
def _do_inference_base(inputs, outputs, stream, execute_async_func):
# Transfer input data to the GPU.
kind = cudart.cudaMemcpyKind.cudaMemcpyHostToDevice
[cuda_call(cudart.cudaMemcpyAsync(inp.device, inp.host, inp.nbytes, kind, stream)) for inp in inputs]
# Run inference.
execute_async_func()
# Transfer predictions back from the GPU.
kind = cudart.cudaMemcpyKind.cudaMemcpyDeviceToHost
[cuda_call(cudart.cudaMemcpyAsync(out.host, out.device, out.nbytes, kind, stream)) for out in outputs]
# Synchronize the stream
cuda_call(cudart.cudaStreamSynchronize(stream))
# Return only the host outputs.
return [out.host for out in outputs]
# This function is generalized for multiple inputs/outputs.
# inputs and outputs are expected to be lists of HostDeviceMem objects.
def do_inference(context, engine, bindings, inputs, outputs, stream):
def execute_async_func():
context.execute_async_v3(stream_handle=stream)
# Setup context tensor address.
num_io = engine.num_io_tensors
for i in range(num_io):
context.set_tensor_address(engine.get_tensor_name(i), bindings[i])
return _do_inference_base(inputs, outputs, stream, execute_async_func)
性能基准
整体下来,按照上述的DSSM模型为例,bs为1,并发量还是trt最高,且性能极好
原生torch: 141并发
torchcompile:182并发
torchscript: 222并发
onnxruntime-gpu: 218并发
tensorrt:490并发
评论区