b站链接:合集·Triton 从入门到精通
算法名词解释:
scheduler 任务调理器
- Triton中的scheduler通常指的是任务调理器(scheduler),它是一种用于管理和调理计算机集群中任务执行的软件组件。在Triton中,scheduler的作用是管理GPU资源的分配和任务的调理,以确保系统的高效利用和任务的顺利执行。
- 详细来说,Triton的scheduler负责以下几个方面的功能:
- 资源管理:scheduler跟踪和管理GPU资源的可用情况,确保任务能够得到得当的GPU资源分配。
- 任务调理:根据任务的优先级、资源需求和系统负载等因素,scheduler决定何时执行哪些任务以及在哪些GPU上执行。
- 负载平衡:scheduler通过在集群中动态分配任务和资源,实现负载平衡,以最大水平地提高系统的团体性能。
- 故障恢复:scheduler能够检测并处置惩罚集群中的故障情况,例如GPU节点的故障或任务执行失败,以确保任务能够在其他可用资源上重新启动或恢复。
model instance、inference和request
- Model Instance(模子实例):
- 模子实例指的是摆设在计算机系统中的一个特定的模子的实例化对象,它是模子在运行时的一个详细实例。在深度学习模子摆设中,通常会将训练好的模子加载到内存中,创建模子实例以供推理(inference)利用。
- 一个模子可以有多个实例运行在差别的计算节点上,如许可以提高系统的并发性和吞吐量。
- Inference(推理):
- 推理是指利用已经训练好的模子对输入数据举行预测或分类的过程。在深度学习中,推理阶段是模子摆设的焦点任务,通过输入数据经过模子实例举行前向传播计算,得到输出结果。
- 推理通常是模子摆设中的主要活动,用于现实应用中对新数据举行预测或分析。
- Request(请求):
- 请求是指外部系统向模子摆设服务发送的请求,请求通常包罗必要举行推理的输入数据。请求可以是针对单个样本的推理请求,也可以是批量处置惩罚多个样本的请求。
- 请求通常包罗了必要对数据举行推理的详细信息,好比输入数据、盼望的输特别式等。
- 这些概念之间的联系可以总结如下:
- 外部系统通过发送请求(request)向摆设的模子实例发送数据,请求可以包罗一个或多个必要举行推理的输入数据。
- 模子实例吸收到请求后,对输入数据举行推理(inference)操作,利用模子举行计算,得出预测结果。
- 推理的结果可以通过响应(response)返回给外部系统,用于后续的处置惩罚或展示。
- 这些概念在模子摆设过程中起着重要的作用:
- 模子实例负责加载和运行模子,处置惩罚来自外部系统的推理请求。
- 推理是模子实例执行的主要任务,它对输入数据举行处置惩罚,天生输出结果。
- 请求是外部系统与模子实例之间举行通信和交互的方式,通过发送请求,外部系统可以向模子实例提交必要举行推理的数据。
batching
- 在大模子摆设中,“batching” 是指将多个输入样本一起发送到模子举行推理的过程,而不是逐个样本举行推理。这个过程通常发生在推理阶段,也就是在模子实例吸收到推理请求后,对输入数据举行处置惩罚的阶段。
- 作用:
- Batching 的主要作用在于提高推理过程的效率和性能,详细表现在以下几个方面:
- 减少推理耽误:通过批量处置惩罚多个样本,可以减少推理请求的数量,从而减少团体推理过程的耽误。相比逐个样本举行推理,批处置惩罚可以更有效地利用计算资源,提高推理的并发性和吞吐量。
- 优化计算资源利用:在深度学习中,通常会利用并行计算来加速推理过程。通过 batching,可以使得计算图中的操作能够更好地被并行执行,充分利用计算资源,提高系统的团体效率。
- 降低通信开销:在分布式环境下,将多个输入样本打包成一个批次发送到模子实例,可以减少网络通信的开销。这对于跨网络较大的推理请求尤其重要,可以减少数据传输的时间和带宽斲丧。
- 例子:
- 假设有一个图像分类任务,必要对一批图像举行分类,每个图像的巨细为 224x224 像素。如果单独对每个图像举行推理,那么对于每个图像,模子都必要举行一次前向传播计算,如许会造成大量的计算资源浪费和推理耽误。
- 而如果利用 batching,将多个图像打包成一个批次发送到模子举行推理,好比将 32 张图像作为一个批次。如许,模子可以一次性并行处置惩罚全部 32 张图像的推理请求,大大提高了计算资源的利用效率,并且减少了团体推理时间。
- 因此,通过 batching,可以在大模子摆设中有效地提高推理过程的效率和性能,特别是在处置惩罚大规模数据时尤为重要。
一、Triton Inference Server原理
1. Overview of Trition
- K8S cluster:在一个容器化的服务编排工具会合,摆设和管理一个推理服务(Inference Service),该服务能够利用多个节点和多个加速卡(如 GPU)举行高效的推理计算。
- Triton:一个利用单个呆板学习模子(mode),运行在一个容器内的推理服务。这个服务可以利用一个或多个 GPU 举行加速计算。
- TensorRT:一种用于加速神经网络(Neural Network, NN)模子推理过程的库。这些库通常包罗优化算法、硬件加速支持和其他技能,以提高神经网络模子在推理(即预测或分类)过程中的速度和效率。
- Triton基本功能:
- 支持多个框架(TensorFlow,PyTorch,TensorRT,ONNX RT,custom backends)
- 支持CPU,GPU和多GPU
- 可以支持多线程在同一时间内同时运行多个呆板学习模子的过程。
- 接受服务:支持http/rest grpt apis
- 系统或服务能够与编排系统和自动扩展工具举行集成,并基于耽误和康健状态指标来自动调整资源和优化性能。
- 模子管理:加载/卸载,模子更新…
- 在github和NGC上开源
2. Design Basics of Trition
- 基于推理生命周期的
- 支持的多个模子框架->这取决于框架,应该是用backend的方式来实现对差别模子的支持
- 共同功能:
- 公共后端管理
- 模子管理-加载/卸载、模子更新、查询模子状态等。
- 并发模子执行-实例管理
- 请求队列调理器
- 推理生命周期管理
- GRPC相关
GRPC服务器
- 基于模子,大抵可分为三种类型
- 单一的没有依靠的模子
- 差别模子的组合,模子之间有顺序
- 有状态的模子:语言类模子(模子内部有顺序)
- 多线程思绪:
1. 对单一模子启动多个线程举行推理
2. 对多个模子举行多线程推理
- 三种模子:
- 无状态模子
- 一样平常是CV模子
- 两种分配方式:匀称分配(默认调理步伐),动态批处置惩罚(提高吞吐率)
- 有状态模子(预测结果取决于先前的序列)
- 一样平常是NLP模子,模子内部是有顺序的
- 顺序批处置惩罚:直接的,最古老的
- 组合模子:有依靠关系的
- 一条模子管道
- 每个模子内部调理顺序无所谓,但模子与模子之间有调理顺序。
3. Auxiliary Features of Trition
- model analyzer:
- 模子分析器是一套工具,它提供关于如何基于特定性能优化Triton中的单个或多个模子所需,帮助用户围绕吞吐量、耽误和GPU内存做出权衡决策。
- 选择正确的模子设置时的占用空间
- 现有的两个基准功能:
- 性能分析-度量变化下的吞吐量(inf/S)和耽误
- 内存分析–测量模子在差别内存占用情况下的GPU内存足迹
4. Additional resources
5. In-house Inference Server vs Trition
6. 编程实战
6.1 Prepare the Model Repository
- 我们必须正确组织好model_reposity后,triton_server才能正确加载模子到服务端。
- 模子目录中三个基本的components
6.2 Configure the Served Model
- 基本组件:
max_batch_size:一样平常不会超过GPU的显存/内存。
- instance groups:在一个triton上对一个模子在gpu上并行运行多个instance,增加吞吐率
- scheduling和Batching决定了triton该利用哪种调理策略去调理请求。
- 加速器
- 热身
6.3 Launch Triton Server
- Simplest Way
- –gpus all :选择可以看到哪些GPU
- -it:interactive 表示开启容器后可以在容器内部举行一些交互
- –rm:当container任务完成后自动关掉container
- –shm-size:指定container可以访问的共享内存的巨细
- -p:指定必要监听的端口,(host端口:映射到container的端口)【8000端口用于http的访问;8001用于 grpc的访问;8002用于?的访问】
- -v:目录的映射,将host中的目录映射到container中的目录
- 最后是triton docker的名称
- run:指定好model_repository的路径即可运行文件
- 检查模子状态:
6.4 Configure an Ensemble Model
- 有顺序模子的组合:value用来毗连差别的step
- Notice:
- 如果其中一个模子是有状态模子,那么推理请求应该包罗有状态模子中的信息。
- 组合模子有自己的调理步伐。
- 如果集成中的模子是框架backend(TensorFlow这种),则它们之间的数据传输不必经过CPU存储器。
6.5 Send Requests to Triton Server
- 在triton server准备好时,如何向server发送请求
- client发送请求的方式:http,grpc,capi
- 通过shared memory传递数据
二、Triton Backend详细教程
1. Overview
1.1 什么时候实现backend?
- 公司想在triton上用自己的深度学习框架:
- 腾讯 - TNN
- 百度 - Paddle
- 英伟达 - HugeCTR
- 客户想用一些custom module和一些主流的深度学习连合在一起成为一个pipeline:
- 预处置惩罚
- 后处置惩罚
- 一些在主流框架中没有实现的模块
1.2 实现backend过程中必要实现哪些内容?
- 名词:
- backend:
- model:模子,像tensorflow这种
- Execute inference:instance,可以同时多个在GPU/CPU上跑
- 接口寄义:
- TRITONBACKEND_Backend对象会把backend对象放到TRITONBACKEND_initialize里面,然后在该函数中对传进来的函数做一些初始化的操作。
- TRITONBACKEND_Modelinitialize:初始化model对象(初始化一些模子共有的属性:模子名称、输入输出,在model configure中定义的那些)。
- TRITONBACKEND_ModelInstanceInitialize:对instance做一些初始化(在哪个设备上运行、包罗一切和instance相关的内容)
- TRITONBACKEND_ModelInstanceExecute:做某一个模子推理时执行的函数。
- TRITONBACKEND_Finalize、TRITONBACKEND_ModeFinalize、TRITONBACKEND_ModelInstanceFinalize:做一些收尾的工作。
- ModelExecution的代码就是写在ModelState和ModelInstanceState中的。
- ModelState:
- 依附于Model对象。
- 维护与Model和ModelInstance相关的全部的属性,包罗模子名称、输入输出、max_batch_size等。
- 实行推理:LoadModel——如何把模子从文件读入到TRITONBACKEND文件来的函数。
- ModelInstanceState:
- 依附于ModelInstance对象。
- 维护ModelInstance的相关信息,包罗instance在哪个设备上运行等。
- SetInput Tensors:用来准备模子推理的输入数据的。
- Execute:举行模子推理。
- readOutput Tensors:把模子推理完的结果的内容拿出来,并且返回给TRITON。
1.3 为什么triton是这么计划去实现一个backend的?
- backend的作用:TRITON对于差别的backend都要有一套同一的机制去运行和管理,至于模子的效果是backend内部的内容,TRITON只负责调用。
- ModelState和ModelInstanceState两个状态类的作用:接口是纯C的函数,不是面向对象的,但同一时间可能有很多instance都要调用这些接口。为了使差别instance的推理可以安全和独立,以是这些接口函数应该在差别instance内部去执行。
- 为什么不将接口函数计划为虚基类:为了解耦backend和triton的主流程。这些接口都写在一个.cc文件中,如果想改变backend或对该backend添加一些新功能时,只必要重新编译该.cc文件即可,不必要重新编译整个TRITON。
2. Code Exploration
2.1 留意事项
- 差别的model instance可能是run在差别的device上的。差别的device要分配自己的内存和推理,以是要留意device_id。
- 批处置惩罚(batching):要backend自己实现。TRITON的pipeline只是把要打batch的requests集合在一个列表中,它自己没有做batching的工作。在执行完推理后,要把输出的数据拆散,还给差别的requests。
- 留意requests对象的管理:
- requests对象是在backend表面创建的,是TRITON创建好了后送进来的。但是执行完成后必要由backend自己来销毁。
- 如果requests发生了问题(空,超过max_batch_size,大概里面获取不到input tensor),则要实时停止整个推理流程并且release掉这些requests对象。
- responses对象的管理:
- 是在backend内部创建的,且不必要在由我们来释放,它是在TRITON pipeline中释放和销毁的。如果我们不慎销毁了response,会导致TRITON pipeline无法把response中的结果返回给用户。
- 当推理发生错误时,要立即返回ERROR response。
2.2 backend写好后是如何编译、build的?
- 要仿照一个线程的CMakeLists写一个自己的CMakeLists。
- 仿照TritonPyTorchBackendConfig.cmake.in写一个TritonPyTorchBackendConfig.cmake.in文件。
- 将libtriton/pytorch.ldscript 复制到你的backend文件夹中。(内容都是一样的)
- build:
- 只buildbackend自己:
- 用cmake去build,用make去编译得到backend.so。
- 将backend.so以特定文件格式copy到tritonserver/backends/中。
- 和整个triton一起build:
- 运行build.py文件,build.py文件是用来编译整个TRITON的,它在编译triton的过程中会编译每一个backend。
- 用 "–backend 你的backend名称:<container_tag> "选项来编译你自己的backend。
3. Summary
- backend有三个层级:backend,model和modelInstance。这三个类都是由Triton pipeline预实现和管理的,不必要由我们实现。
- 必要我们完成的工作:
- 必要为backend实现初始化、完成、执行(Initialize,Finalize,Execute)接口。
- 重点实现ModelState。它负责模子读取和维护模子相关的属性,并附加(attach)到BackendModel中去。
- 重点实现ModelnstanceState。它是真正负责推理的类,它负责发送推理完的responses,准备输入数据,负责维护和modelInstance相关的属性和数据,并将其附加(attach)到BackendModelInstance对象上。
- SetinpuTensors():举行批处置惩罚,准备输入张量
- EXECUTE():模子推断
- ReadOutputTensors():读取输出和发送响应
- 留意事项:
- 留意设备(device_id)。
- 批处置惩罚:从全部的requests中将input date汇聚成大batch,并将输出拆开,还给全部的requests对应的responses。
- 审慎管理requests和responses对象的创建和销毁。
- 编译backend:
- 编写CMakeLists.txt。
- 编译backend,最后将共享库复制到Triton backend目录中。
三、Triton Python Backend & BLS Deep Dive
1. Python Backend(用python实现custom backends)
1.1 Triton工作原理
- 模子推理摆设流程:triton上摆设的全部模子都是通过backend摆设在服务器上的。例如我们训练好一个tensorflow的模子,我们就用tensorflow的backend去起一个/多个的model instance,将这些model instance放在CPU/GPU上去执行推理。
- 并发执行原理:对于每一个backend来说,它里面的每一个model instance是由一个线程来管理的。因此,我们可以在一个triton sever上对一个model起多个model instance,让这些model instance并行地去做推理的执行。
- 客户端:可以用triton提供的custom的api去实现自己想要的恣意的操作逻辑,包罗前后处置惩罚。
1.2 为什么我们必要Python backend?
- 很多预处置惩罚/后处置惩罚模块都是用Python实现的(例如:在pipeline中加入前后处置惩罚的模块)。
- 我们现在已经有一些用python编写的处置惩罚单位,我们想把这些处置惩罚单位放到triton中去摆设。我们可以用Python代码将其打包到Triton模子中。
- 比C++自定义backend更容易编写,不必要编译。
1.3 工作原理

- python backend的作用:
- 全部backend都要遵照7大关键接口,且每一个backend实例要在一个triton线程中管理。虽然Triton同样要根据7大接口来实现通用backend框架,但是用C++写的backend仅仅是ttriton关于python backend的一个agent(代理),它里面并不包罗真正的python backend的执行逻辑。由C++实现的python backend也是由线程管理的,和Triton server主线程都存在于一个主历程内。
- 最后一列的python model才是python backend真正要实现的内容,python model是由历程管理的。以是triton是通过共享内存shard mem让triton server和python model举行一个交互。对每一个python model都建立一个shm block来负责python model与triton server之间的通信。
- shared mem必要负责哪些内容:
- health flag:康健标志位,标志python backend的模子实例是不是正常。我们在每次做triton backend和python backend通信时,不管是input通信照旧output通信,都会先把health tag置为false。而python backend会每隔300ms将health tag置为true。
- Request Message(信息队列):将triton的数据传给python model instance。每当一个request送到triton server之后,C++ python backend agent会将该request以message的形式入队到messageQ中,MessageQ中的数据会被取出到现实的python backend中。
- Response MessageQ(信息队列):当python backend运算完成后,它的输入同样会以message的形式存放到response MessageQ中。MessageQ中的output会被C++ python backend agent取出,并打包成response发回给客户端。
- Request MessageQ和Response MessageQ中的inputQ和outputQ都是生产者消费者模子,我们必要维护两个信号量来管理生产者消费者问题。
1.4 如何实现?
- 在python backend中只必要实现3个关键接口。
- 例子1:
- “rn50_onnx_pb.py”
- import triton_python_backend_utils as pb_utils
- import onnxruntime
- import json
- import os
- # 类的名字不可改
- class TritonPythonModel:
- """Your Python model must use the same class name. Every Python model
- that is created must have "TritonPythonModel" as the class name.
- """
- def initialize(self, args):
- """`initialize is called only once when the model is being loaded.
- Implementing initialize function is optional. This function allows
- the model to initialize any state associated with this model.
- Parameters
- ----------
- args : dict
- Both keys and values are strings. The dictionary keys and values are:
- *model config:A JSON string containing the model configuration
- model instance kind: A string containing model instance kind
- model instance device id: A string containing model instance device ID
- model repository: Model repository path
- model version: Model version
- model name: Model name
- """
- # 获取模型的config:You must parse model config. JSON string is not parsed here
- self.model_config = model_config = json.loads(args['model_config'])
-
- # 取出输出的config:Get OUTPUT configuration
- output_config = pb_utils.get_output_config_by_name(model_config, "output")
-
- # 获取输出的data_type:Convert Triton types to numpy types
- self.output_dtype = pb_utils.triton_string_to_numpy(output_config['data_type'])
-
- # 获取python脚本的路径:Get path of model repository
- self.model_directory = os.path.dirname(os.path.realpath( file ))
-
- # 通过上述路径生成onnx model的路径,创建onnx模型处理的session:Create nnx runtime session for inference
- self.session = onnxruntime.InferenceSession(os.path.join(self.model_directory, 'model.onnx'))
-
- print('Initialized...')
-
- def execute(self,requests):
- """`execute` must be implemented in every Python model.
- `execute` function receives a list of pb utils.InferenceRequest as the only
- argument. This function is called when an inference is requested for this model.
- Parameters
- ------------------
- requests :list 可能包含一个/多个request
- A list of pb utils.InferenceRequest
- Returns
- ------
- list
- A list of pb utils.InferenceResponse. The length of this list must
- be the same as requests.
- """
- output_dtype = self.output_dtype
-
- responses = [] #返回值
-
- # Every Python backend must iterate through list of requests and create
- # an instance of pb utils.InferenceResponse class for each of them. You
- # should avoid storing any of the input Tensors in the class attributes
- # as they will be overridden in subsequent inference requests. You can
- # make a copy of the underlying NumPy array and store it if it is required.
-
- for request in requests:
- # Get INPUT
- # 这个输入的tensor是python backend的tensor
- input_tensor = pb_utils.get_input_tensor_by_name(request, "input")
- input_array = input_tensor.as numpy() #转换为numpy array,为了让input tensor可以去onnx里做input
-
- # 跑onnx的session(会话)得到输出的tensor(prediction):Run inference with onnxruntime session
- prediction = self.session.run(None,{"input": input_array})
- # 将prediction转换为config文件中定义好的output datatype,再打包为python backend的output tensor:Pack output as python backend tensor
- out_tensor = pb_utils.Tensor("output", prediction[0].astype(output_dtype))
-
- # 将out_tensor转换为request对应的输出tensor
- inference_response = pb_utils.InferenceResponse(output_tensors=[out_tensor])
- responses.append(inference_response)
-
- # You must return a list of pb utils.InferenceResponse. Length
- # of this list must match the length of `requests` list.return responses
- return responses
-
- def finalize(self):
- """ finalize is called only once when the model is being unloaded
- Implementing finalize`function is optional. This function allows
- the model to perform any necessary clean ups before exit.
- """
- print('cleaning up...')
-
复制代码
- 例子2:
- 默认情况下,发送到Python backend的tensor将自动复制到cpu上。
- 要在GPU上保持tensor,必要在设置文件中设置以下内容:
- parameters:{ key:“FORCE CPU ONLY INPUT TENSORS” value: (string value:“no”} }
- 如许,传入python backend的input tensor将不会被强制转换为CPU上。
- 你可以通过以下方法来检查tensor的位置:
- Pb_utils.Tensor.is_CPU()
- 可以检查tensor到底在CPU照旧GPU上
- 例子3:
- 将pytorch推理的过程放到GPU上去运行。
- “rn50_torch_pb.py”

- 设置custom python execution environment
- 留意事项:
- 手动指定模子跑在什么设备上。
- requests是没有被批处置惩罚的,它是一个requests列表,必要手动批处置惩罚。
- 对于每个request,必须发送一个response。
- 将支持将来的解耦response:在C++的custom backend支持一收多发、一收不发、一收一发大概多收一发、多收不发、多收多发。
- 在python backend中必须一收一发。
- FORCE_CPU_ONLY_INPUT_TENSORS 参数是必要的,以制止cpu-gpu副本。
- 数据传输必要大的共享内存:每个实例至少必要64 MB的SHM。
- python backend绝对不如C++ backend有效,特别是对于循环。
2. BLS:Business Logic Scripting
2.1 何时利用?
- 动态pipeline:运行时根据output1的值动态地决定要去哪个分支。
- 动态pipeline用triton实现不了,可以用BLS实现。
2.2 如何实现?
- 同步执行:只有当得到了inference_response之后,才能去执行后续的代码(if else)。
- def execute(self,requests):
- responses = []
- input_tensors = []
- batch_sizes = []
- # Every Python backend must iterate over everyone of the requests
- # and create a pb utils.InferenceResponse for each of them.
- for request in requests:
- # Get INPUT Triton]tensor
- in_tensor = pb_utils.get_input_tensor_by_name(request, "INPUT" )
- # Convert to torchtensor
- pytorch_tensor = from_dlpack(in_tensor.to_dlpack())
- input_tensors.append(pytorch_tensor)
- batch_sizes.append(pytorch_tensor.shape[0])
-
- # Concat input tensor in all requests into batch
- batch_input_tensor = torch.cat(input_tensors, axis=0).to('cuda')
- # 函数的第一个参数要填写你即将请求的模块的参数(config中)(INPUT_0)
- batch_input = pb_utils.Tensor.from_dlpack("INPUT_0",to_dlpack(batch_input_tensor))
-
- # Make inference request for the first BLs call to preprocess
- infer_request = pb_utils.InferenceRequest(
- model_name = 'preprocess',
- requested_output_names = ["OUTPUT_0"],
- inputs = [batch_input])
-
- # 依次调用BLS的模块
- # 调用预处理,proprocess模块:First BLS call to preprocess
- batch_preprocess_response = infer_request.exec()
-
- # Extract output tensor from the resposne of first call
- batch_preprocess_output = pb_utils.get_output_tensor_by_name(batch_preprocess_response, 'OUTPUT_0')
-
- # Make inference request for the second BlS call to classifier
- # 对inference_request来说,它的pb tensor的名字必须和请求的模块(model_name)的输入tensor一致。
- infer_request = pb_utils.InferenceRequest(
- model_name = 'classifier'
- requested_output_names =「"OUTPUT_0"],
- inputs = [pb_utils.Tensor.from_dlpack('INPUT_0',batch_preprocess_output.to_dlpack())])
-
- # 运行第二个模块:Second BLS call to classifier
- batch_classifier_response = infer_request.exec()
-
- # 判断分类器的分类结果
- # 把分类器的分类结果拿出来:Extract output tensor from the resposne of second call
- batch_classifier_output = pb_utils.get_output_tensor_by_name(batch_classifier_response, 'OUTPUT_0')
- # 将结果转为torch tensor:Convert classifier output to torch tensor, shape 「batch size, 1000]
- batch_classifier_tensor = from_dlpack(batch_classifier_output.to_dlpack())
- # 找出概率值最大的那个:Get the category indices from classifier output, shape [ batch size ]
- batch_class_ids = torch.argmax(batch_classifier_tensor, dim=1)
-
- batch_seg_input = pb_utils.Tensor.from_dlpack("input", batch_preprocess_output.to_dlpack())
- #判断图片是不是猫/狗:Check if the input images contain cat or dog
- if 283 in batch_class_ids or 263 in batch_class_ids:
- # if the input images contain cat or dog, segment with deeplabv3_rn50
- # Make inference request for the third BLs call to deeplabv3 rn50
- infer_request = pb_utils.InferenceRequest(
- model_name = 'deeplabv3 rn50',
- requested_output_names = ["out",'aux'], #config里定义好的
- inputs = [batch_seg_input])
- # 运行第一个分割模块:Third BLs call to deeplabv3 rn50
- batch_seg_response = infer_request.exec()
- else:
- # if the input images do not contain cat or dog, segment with fcn_resnet50
- # Make inference request for the third BLS call to fcn resnet50
- infer_request = pb_utils.InferenceRequest(
- model_name = 'fcn resnet50',
- requested_output_names=["out",'aux']
- inputs = [batch_seg_input])
- # 运行第二个分割模块:Third BLs call to deeplabv3 rn50
- batch_seg_response = infer_request.exec()
-
- # 后处理操作:Get segmentation output tensor from response
- batch_seg_output = pb_utils.get_output_tensor_by_name(batch_seg_response, 'out')
- batch_seg_tensor = from dlpack(batch_seg_output.to_dlpack())
- batch_seg_tensor = torch.softmax(batch_seg_tensor, dim=1)
- batch_seg_tensor = batch_seg_tensor*255.0
- batch_seg_tensor = batch_seg_tensor.type(torch.uint8)
-
- # Define the mapping between classifier id and segmentation id
- class_seg_id_map = {817:7,283:8,263:12,339:13,681:0,665:14, 176:13}
- batch_classes = batch_class_ids.cpu().detach().numpy()
-
- # Extract output data from batched tensor to individual response
- cursor = 0
- for i in range(len(requests))
- batch_size = batch_sizes[i]
- batch_mask = []
- for j in range(cursor, cursor + batch_size):
- cls = class_seg_id_map[batch_classes[j]]
- print(cls)
- mask = batch_seg_tensor[j,cls,:,:]
- batch_mask.append(mask)
- batch_mask_tensor =torch.stack(batch_mask, dim=0)
- print(batch_mask_tensor.shape)
- batch_output = pb_utils.Tensor.from_dlpack('OUTPUT', to_dlpack(batch_mask_tensor))
- responses.append(pb_utils.InferenceResponse(output_tensors=[batch_output]))
-
- cursor += batch_size
- return responses
复制代码


2.3 工作原理?
- BLS的本质是python backend。
- 在python backend中,首先准备一个BLS Request去调用step的模子。调用过程会把request传递给python stub 历程,历程会把该request存入shared mem中。
- triton的python backend agent会将shared mem中的request input取出,调用triton的api将input送入你指定的模子上去运行。而后将输出结果存入shared mem里面,python stub历程从中将输出结果取出,传给python的model。
- 在python backend agent中会运行一个while(InferExecRequest) 循环,这个标志位也是存在shared mem中,由python stub历程设置的。如果python model不绝没执行完,那么该标志位就会不绝是true,直到执行完最后一个模块,该标志位才会被置为false。
- Notice:
- 内存复制开销:
- 对于cpu管道,输入1份(将输入从shared mem拷贝到两个模块上),输出2份(第一次是将模块的输出tensor拷贝到shared mem中,第二次是将shared mem中的输出tensor拷贝出来,让triton返回给客户端)。
- 对于GPU管道,利用cudalPC,非常小的开销。(不必要做shared mem到模子空间的拷贝,直接走cudaIPC)
- BLS不支持pipeline并行。
- FORCE_CPU_ONLY_INPUT_TENSORS 参数是必要的,以制止cpu-gpu副本。
- 总结:
四、Triton Stateful Model
1. Application
2. Stateful Models
- 定义:模子中必要把同一个sequence中的差别的请求之间的状态维护住,且模子每次拿到的请求一定是把全部inferences定向到一个instance里面,因为模子的state是与instance绑定的。
- 用什么方法能把全部的请求定向到一个instance里面?用sequence batcher。
- 怎么设置control signals控制信号?要在客户端中设置信号且模子是能够辨认这些信号的。
2.1 Sequence batcher
2.2 Control Inputs
2.3 Direct & Oldest
- 打batch的两种方法:direct和oldest
- direct:
- 把每个sequence和每个instance上的每一个slot做一个绑定。
- slot就是batch中第几个的位置。
- sequence instance只负责定向到每个batch上,而direct还负责定向到每个slot上。
- 参数:
- max_queue_delayr_microseconds
- float_minimum_slot_utilization
- oldest:
- slot不必要和某个sequence绑定。
- 包管一个batch里面是来自差别的sequence
2.4 Streaming client
3. Practice : CTC Streaming

- import numpy as np
- import json
- # triton python backend utils is available in every Triton Python model. You
- # need to use this module to create inference requests and responses, It also
- #contains some utility functions for extracting information from modol confia
- #and converting Triton input/output types to numpy types.
- import triton_python_backend_utils as pb_utils
- from multiprocessing.pool import ThreadPool
- class Decoder(obfect):
- def init (self,blank):
- self.prev = ''
- self.result = ''
- self.blank_symbol = blank
-
- def decode(self,input,start, ready): # 进行解码
- """
- input: a list of characters/a string
- """
- if start:
- self.prev = ''
- self.result = ''
- if ready:
- for li in input.decode("utf.8”):
- if li != self.prev:
- if li != self.blank_symbol:
- self.result += l1
- self.prev = li
- r = np.array([[self.result]])
- return r
-
- class TritonPythonModel:
- """Your Python model must use the same class name. Every Python model
- that is created must have "TritonPythonodel" as the class name.
- """
-
- def initialize(self, args):
- """initialize`is called only once when the model is being loaded.
- Implementing `initialize`function is optional. This function allows
- the model to intialize any state associated with this model.
- Parameters:
- ----------------
- args :dict
- Both keys and values are strings. The dictionary keys and values are:
- *model config: A JSON string containing the model configuration
- *model instance kind: A string containing model instance kind
- *model instance device id: A string containing model instance device ID
- *model repository:Model repository path
- *model version: Model version
- *model name: Model name
- """
- # 加载模型配置文件:You must parse model config. JSON string is not parsed here
- self.model_config = model_config = json.loads(args['model_config'])
-
- # get max batch size
- max_batch_size = max(model config["max_batch_size"],1)
-
- # 读取blank符号:get blank symbol from config
- blank = self.model_config.get("blank_id",'.')
-
- # 将解码器的数量初始化为max_batch_size:initialize decoders
- self.decoders = [Decoder(blank)for i in range(max_batch_size)]
-
- # Get 0UTPUT configuration
- output0_config = pb_utils.get_output_config_by_name(model_config,"OUTPUTO")
- # Convert Triton types to nupy types
- self.output0_dtype = pbvuils.triton_string_to_numpy(outpute_config['data_type'])
-
- def batch_decode(self, batch_input, batch_start, batch_ready):
- responses = []
- args = []
- ldx = 0
- for i,r,s in zip(batch_input, batch_ready, batch_start):
- args.append([lidx,1,r,s])
- idx += l
- with ThreadPool()as p:
- responses = p.map(self.process_single_request, args)
- return responses
-
- def process_single_request(self,inp):
- decoder_idx,input,ready,start = inp
- # 对每一个request都调用对应的decoder对它进行一个操作
- response = self.decoders[decoder idx].decode(input[e], start[0], ready[0])
- out_tensor_0 = pb_utils.Tensor("0UTPUTo", response.astype(self.output0 dtype))
- inference_response = pb_utils.InferenceResponse(output_tensors = loqt_tensor_0])
- return inference_response
- def execute(self,requests):
- """execute’ MuST be implemented in every python model. 'execute
- function receives a list of pb utils.InferenceRequest as the only
- argument. This function is called when an inference reguest is made
- for this model, Depending on the batching configuration (e.g. Dynamic
- Batching)used,"reguests may contain multiple requests. Every
- Python model, must create one pb utiis.inferenceResponse for every
- pb utils.InferenceRequest in "requests . if there is an error, you can
- set the error arqument when creating a pb utils.InferenceResponse
- Parameters
- ----------------------------
- requests :list
- A list of pb utils.InferenceReguest
- Returns
- ----------------------------
- list
- A list of pb utils.InferenceResponse, The length of this list must
- be the same as "requests
- """
- # print("START NEW")
- responses = []
-
- batch_input = []
- batch_ready = []
- batch_start =[]
- batch_corrid =[]
- # Every Python backend ausr iterate over everyone of the reguests
- # and create a pb utils.InferenceResponse for each of them.
- for request in requests:
- #Get INPUTe
- in_0 = pb_utils.get_input_tensor_by_name(request, "INPUT")
- #in 8> <triton python backend utils.Tensor object
- #in 0> ndarray!'xcx }
- batch_input += in_0.as_numpy().tolist()
-
- in_start = pb utils.get_input_tensor_by_name(request, "START")
- batch_start += in_start.as_numpy().tolist()
-
- in_ready = pb_utils.get_input_tensor_by_name(request, "READY")
- batch_ready += in_ready.as_numpy().tolist()
-
- in_corrid = pb_utils.get_input_tensor_by_name(request, "CORRID")
- batch_corrid += in_corrid.as_numpy().tolist()
-
- #print("corrid",batch corrid)
- #print("batch input".batch input)
- responses = self.batch_decode(batch_input, batch_start, batch_ready)
- #print("batch response",responses)
- # You should rerurn a list of pb utils.InferenceResponse. Length
- #of this list must match the lenoth ofreguests list
- assert len(requests) == len(responses)
- #printf"send responses":responses)
- return responses
- def finalize(self):
- """finalize’is called only once when the model is being unloaded
- Implmenting “finalize" function is OPTIONAL. This function allows
- the model to perform any necessary clean ups before exit.
- """
- print('cleaning up...')
复制代码 4. Practice : WeNet
5. Summary
- 流程:
- 模子初始化:从config文件中将参数读取进来初始化我们的模子,将我们的模子放到GPU上去。
- 模子运行:对于每一个请求,先检查下其start状态,将其初始化。将请求相关的信息都batching,然后用模子去做推理。做完推理后更新全部的state,将response返回给客户端(用户)。[可以在CPU上写多线程代码,用swig包一下,然后在python backend中用python代码调用C++代码,就可以绕过全局解释的问题了。]
- 模子卸载:在model finalize中把全部模子都清掉,将state也扫除掉。
五、Triton 优先级管理
- 如安在triton中举行优先级的管理?给差别的request和model设置优先级信息?
- request queue:可以对request举行优先级分别及管理。
- rate limiter:可以对model instance举行优先级管理。
Triton Priority Queue
工作原理
- 在triton中有很多差别的scheduler(任务调理器)环境举行request以及model的调理。差别的scheduler适用于差别model的类型。
- 一样平常一个模子只有一个调理队列(scheduler queue),吸收到的全部的request都会按照顺序放入scheduler queue当中。无论模子有几个model instance,默认都只有一个scheduler queue。如果有空闲的model instance,那么request队列中的request就会按照先后顺序被调理到空闲的model instance上去一些inference推理。
- dynamic batching中priority queue的运行模式:
- 在dynamic batching中,我们可以设置priority queue来举行request优先级的管理和调理。在dynamic batching中,我们可以设置多个优先级,拥有多个差别的优先级的队列,priority_n,n越大,优先级越低。
- 当我们发送请求时,必要在客户端给请求设置一个优先级。当server吸收到请求后,就会把请求加入到相应的优先级队列中,去等待调理和执行。假如有多个空闲的model instance,那么它会去首先执行高优先级队列中的request。
- 缺点:
- 如果客户端没有指定request的priority咋办?
- 低优先级的request可能会饿死。
- 怎样控制queue的巨细?
- ensemble model会组合多个模子,如果其中一个模子利用到了dynamic batching且设置了priority queue,那么在dynamic batching中是否适用?(依然有效)
如何利用?
- 利用的步骤
- 设置config文件,在config文件中在dynamic batching的设置项中设置priority的字段。
- load模子。
- 通过client api向server发起请求。
- config文件的参数:
- “client发送请求”
- # 发送n个请求
- for i in range(inference_count):
- if i % 2 == 0:
- priority = 1
- else:
- priority = 2
-
- client.async infer(
- model_name,
- inputs,
- partial(completion_callback, user_data)
- request_id = str(i),
- model_version = str(1),
- # timeout=1000,
- outputs = outputs,
- priority =priority)
- processedcount =0
- while processed_count < inference_count:
- (results,error) = user data. completed_requests.get()
- processed_count += 1
- if error is not None:
- print("inference failed:"+ str(error))
- # sys.exit(1)
- continue
- this_id = results.get_response().id
- if int(this_id)%2 == 0:
- this_priority = 1
- else:
- this_priority = 2
- print("Request id {} with priority {} has been executed".format(this id, this priority))
复制代码 Rate Limiter
原理
- 当客户端发送了很多request,server吸收了request之后,只要我们的model instance有空闲的,scheduler就会把request调理到空闲的model instance上,然后到硬件上执行model inference。
- 当request充足多,triton server会同时运行多个模子,多个model instance的计算的。可以提高GPU的利用率和网络的吞吐量。
如何利用?
- 利用rate limiter必要人为设置一个资源池,在资源池中可以设置很多种差别的资源以及它的数量。这些资源并不映射到任何硬件资源上,它只是我们人为设置的一个逻辑上的概念。
- 资源的分类:
- local resource:每个GPU都有的资源。
- global resource:不分GPU,整个triton server共用的资源。
- 当我们把request调理到空闲model instance上时,它首先要去资源池申请资源。只有当它把资源都申请到后,才能去执行inference。
- rate limiter起到一个限速的作用,它会使没有资源的model instance不能执行,只能等待。
- rate limiter可以选择把资源分配给哪个历程,从而给model instance制定优先级。
- 如何利用rate limiter?
- 在model的设置文件中举行rate limiter的设置。
- 如何设置资源池中的资源
- rate limiter的表现
总结
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |