LLamafactory API摆设与使用异步方式 API 调用优化大模型推理效率 ...

打印 上一主题 下一主题

主题 910|帖子 910|积分 2730

背景先容

第三方大模型API

如今,市面上有许多第三方大模型 API 服务提供商,通过 API 接口向用户提供多样化的服务。这些平台不仅能提供更多类别和范例的模型选择,还因其用户规模较大,能够以更低的成本从原厂获得服务,再将其转售给用户。别的,这些服务商还支持一些海外 API 服务,例如 ChatGPT 等,为用户提供了更加广泛的选择。


  • https://www.gptapi.us/register?aff=9xEy
比如上述网站以 API 接口的情势对外提供的服务,比官方的 API 要便宜。
装包:
  1. pip install langchain langchain_openai
复制代码
运行下述代码,完成上述网站的注册后,并填上述网站的 api_key 便可通过 python API 调用,就会收到 gpt-4o-mini 大模型的响应。
  1. from langchain_openai import ChatOpenAI
  2. llm = ChatOpenAI(
  3.     model="gpt-4o-mini",
  4.     base_url="https://www.gptapi.us/v1/",
  5.     api_key="sk-xxx", # 在这里填入你的密钥
  6.     )
  7. res = llm.invoke("你是谁?请你简要做一下,自我介绍?")
  8. print(res)
复制代码
先容

在摆设垂直范畴模型时,我们通常会对开源大模型举行微调,并获得相应的 LoRA 权重。在接下来的部分,我将先容如何使用 LLamafactory 将微调后的 LoRA 模型摆设为 API 服务。
在 Python 中调用 API 服务时,如果接纳同步方式举行请求,往往会导致请求速度较慢。因为同步方式需要在吸收到上一条请求的响应后,才能发起下一条请求。
为了解决这一题目,我将为大家先容如何通过异步请求的方式,在短时间内发送大量请求,从而提升 API 调用效率。
LLamafactory 摆设API

关于 LLamafactory 的下载与微调模型,点击查看我的这篇博客:Qwen2.5-7B-Instruct 模型微调与vllm摆设具体流程实战.https://blog.csdn.net/sjxgghg/article/details/144016723
vllm_api.yaml 的文件内容如下:
  1. model_name_or_path: qwen/Qwen2.5-7B-Instruct
  2. adapter_name_or_path: ../saves/qwen2.5-7B/ner_epoch5/
  3. template: qwen
  4. finetuning_type: lora
  5. infer_backend: vllm
  6. vllm_enforce_eager: true
  7. # llamafactory-cli chat lora_vllm.yaml
  8. # llamafactory-cli webchat lora_vllm.yaml
  9. # API_PORT=8000 llamafactory-cli api lora_vllm.yaml
复制代码
使用下述命令便可把大模型以 API 摆设的方式,摆设到8000端口:
  1. API_PORT=8000 llamafactory-cli api vllm_api.yaml
复制代码

LangChain 的 invoke 方法是常用的调用方式,但该方法并不支持异步操作。如果读者想了解同步与异步在速度上的差距,可以自行尝试使用一个 for 循环调用 invoke 方法,并对比其性能表现。
  1. import os
  2. from langchain_openai import ChatOpenAI
  3. client = ChatOpenAI(
  4.     model="gpt-3.5-turbo",
  5.     api_key="{}".format(os.environ.get("API_KEY", "0")),
  6.     base_url="http://localhost:{}/v1".format(os.environ.get("API_PORT", 8000)),
  7. )
  8. res = llm.invoke("你是谁?请你简要做一下,自我介绍?")
  9. print(res)
复制代码

在项目文件夹下,新建一个 .env 文件, 其中 API_KEY 的值便是API接口调用的 API_KEY。
  1. API_KEY=sk-12345678
复制代码
LLamafactory 通过API摆设的大模型所在是: http://localhost:8000/v1
API_KEY 是.env 文件中 API_KEY:sk-12345678
大模型 API 调用工具类

使用异步协程加快 API 的调用速度,可以参考我们前面的这篇文章:大模型 API 异步调用优化:高效并发与令牌池设计实践.https://blog.csdn.net/sjxgghg/article/details/143858730
我们在前面一篇文章的基础上,对异步类再封装了一下。
装包:
  1. pip install langchain tqdm aiolimiter python-dotenv
复制代码
  1. import os
  2. import random
  3. import asyncio
  4. import pandas as pd
  5. from tqdm import tqdm
  6. from typing import List
  7. from dataclasses import dataclass, field
  8. from aiolimiter import AsyncLimiter
  9. from langchain_openai import ChatOpenAI
  10. from dotenv import load_dotenv
  11. load_dotenv()
  12. def generate_arithmetic_expression(num: int):
  13.     """
  14.     生成数学计算的公式和结果
  15.     """
  16.     # 定义操作符和数字范围,除法
  17.     operators = ["+", "-", "*"]
  18.     expression = (
  19.         f"{random.randint(1, 100)} {random.choice(operators)} {random.randint(1, 100)}"
  20.     )
  21.     num -= 1
  22.     for _ in range(num):
  23.         expression = f"{expression} {random.choice(operators)} {random.randint(1, 100)}"
  24.     result = eval(expression)
  25.     expression = expression.replace("*", "x")
  26.     return expression, result
  27. @dataclass
  28. class AsyncLLMAPI:
  29.     """
  30.     大模型API的调用类
  31.     """
  32.     base_url: str
  33.     api_key: str  # 每个API的key不一样
  34.     uid: int = 0
  35.     cnt: int = 0  # 统计每个API被调用了多少次
  36.     model: str = "gpt-3.5-turbo"
  37.     llm: ChatOpenAI = field(init=False)  # 自动创建的对象,不需要用户传入
  38.     num_per_second: int = 6  # 限速每秒调用6次
  39.     def __post_init__(self):
  40.         # 初始化 llm 对象
  41.         self.llm = self.create_llm()
  42.         # 创建限速器,每秒最多发出 5 个请求
  43.         self.limiter = AsyncLimiter(self.num_per_second, 1)
  44.     def create_llm(self):
  45.         # 创建 llm 对象
  46.         return ChatOpenAI(
  47.             model=self.model,
  48.             base_url=self.base_url,
  49.             api_key=self.api_key,
  50.         )
  51.     async def __call__(self, text):
  52.         # 异步协程 限速
  53.         self.cnt += 1
  54.         async with self.limiter:
  55.             return await self.llm.agenerate([text])
  56.     @staticmethod
  57.     async def _run_task_with_progress(task, pbar):
  58.         """包装任务以更新进度条"""
  59.         result = await task
  60.         pbar.update(1)
  61.         return result
  62.     @staticmethod
  63.     def async_run(
  64.         llms: List["AsyncLLMAPI"],
  65.         data: List[str],
  66.         keyword: str = "",  # 文件导出名
  67.         output_dir: str = "output",
  68.         chunk_size=500,
  69.     ):
  70.         async def _func(llms, data):
  71.             """
  72.             异步请求处理一小块数据
  73.             """
  74.             results = [llms[i % len(llms)](text) for i, text in enumerate(data)]
  75.             with tqdm(total=len(results)) as pbar:
  76.                 results = await asyncio.gather(
  77.                     *[
  78.                         AsyncLLMAPI._run_task_with_progress(task, pbar)
  79.                         for task in results
  80.                     ]
  81.                 )
  82.             return results
  83.         idx = 0
  84.         all_df = []
  85.         while idx < len(data):
  86.             file = f"{idx}_{keyword}.csv"
  87.             file_dir = os.path.join(output_dir, file)
  88.             if os.path.exists(file_dir):
  89.                 print(f"{file_dir} already exist! Just skip.")
  90.                 tmp_df = pd.read_csv(file_dir)
  91.             else:
  92.                 tmp_data = data[idx : idx + chunk_size]
  93.                 loop = asyncio.get_event_loop()
  94.                 tmp_result = loop.run_until_complete(_func(llms=llms, data=tmp_data))
  95.                 tmp_result = [item.generations[0][0].text for item in tmp_result]
  96.                 tmp_df = pd.DataFrame({"infer": tmp_result})
  97.                 # 如果文件夹不存在,则创建
  98.                 if not os.path.exists(tmp_folder := os.path.dirname(file_dir)):
  99.                     os.makedirs(tmp_folder)
  100.                 tmp_df.to_csv(file_dir, index=False)
  101.             all_df.append(tmp_df)
  102.             idx += chunk_size
  103.         all_df = pd.concat(all_df)
  104.         all_df.to_csv(os.path.join(output_dir, f"all_{keyword}.csv"), index=False)
  105.         return all_df
  106. if __name__ == "__main__":
  107.     # 生成 数学计算数据集
  108.     texts = []
  109.     labels = []
  110.     for _ in range(1000):
  111.         text, label = generate_arithmetic_expression(2)
  112.         texts.append(text)
  113.         labels.append(label)
  114.     llm = AsyncLLMAPI(
  115.         base_url="http://localhost:{}/v1".format(os.environ.get("API_PORT", 8000)),
  116.         api_key="{}".format(os.environ.get("API_KEY", "0")),
  117.     )
  118.     AsyncLLMAPI.async_run(
  119.         [llm], texts, keyword="数学计算", output_dir="output", chunk_size=500
  120.     )
复制代码
使用异步类,在短时间内向对方服务器,发送大量的请求可能会导致服务器拒绝响应。
由于使用了异步的请求,则必须在全部的请求都完成后才能拿到结果。为了避免步伐中途瓦解导致前面的请求的数据丢失,故 使用 chunk_size 对请求的数据举行切分,每完成一块数据的请求则把该块数据保存到csv文件中。
本文使用 generate_arithmetic_expression 生成1000条数学计算式,调用大模型 API 完成计算。
运行效果如下:
原始的 1000 条数据,设置chunk_size为500,故拆分为2块500条,分批举行处理惩罚。

为了避免步伐崩垮,分批举行异步推理,若步伐瓦解了,可重新运行,步伐会从上一次瓦解的点重新运行。(要保证数据集输入的一致!)

最终的输出文件是 all_数学计算.csv ,它是全部分快csv文件的汇总。
项目开源

https://github.com/JieShenAI/csdn/tree/main/24/11/async_llm_api



  • vllm_api.yaml 是 llamafactory 的API摆设的配置;
  • core.py 是紧张代码;

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

张国伟

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表