llama-index做一个简单查询工作流(rag)

打印 上一主题 下一主题

主题 1931|帖子 1931|积分 5793

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
目次
  一、导入本地大模型、embedding
  二、workflow
  三、导入数据
  四、工作流可视化
  五、输入工具,输出结果
  一、导入本地大模型、embedding

 代码如下(示例):导入的千问的0.5b大模型
或者使用api接口
  1. from llama_index.llms.zhipuai import ZhipuAI
  2. llms = ZhipuAI(model="glm-4")#api要设置
复制代码
  1. from llama_index.embeddings.huggingface import HuggingFaceEmbedding
  2. from llama_index.llms.huggingface import HuggingFaceLLM
  3. from llama_index.core import Settings
  4. llm=HuggingFaceLLM(
  5.     model_name="LLM_model/Qwen2.5-0.5B-Instruct",
  6.     tokenizer_name="LLM_model/Qwen2.5-0.5B-Instruct",
  7.     context_window=30000,
  8.     max_new_tokens=2000,
  9.     generate_kwargs={"temperature": 0.7, "top_k": 50, "top_p": 0.95},
  10.     device_map="auto")
复制代码
  # 设置embedding模型,后面进行向量库储存的时候需要用到
# 用魔搭下载,更快,改成本身的路径
  modelscope download --model AI-ModelScope/bge-large-zh-v1.5
  --local_dir LLM/LLM_model/embeddding-bge-large-zh-v1.5
  1. from llama_index.embeddings.huggingface import HuggingFaceEmbedding
  2. from llama_index.core import Settings
  3. Settings.embed_model =  HuggingFaceEmbedding(
  4.     model_name="LLM_model/embeddding-bge-large-zh-v1.5",trust_remote_code=True)
复制代码

二、workflow

工作流应该是这样:
制定计划、进行查询、汇聚答案,决定是否再继续查询
故我们可以界说两个变乱
  1. #事件
  2. from llama_index.core.workflow import Event
  3. class QueryPlanItemResult(Event):
  4.     """The result of a query plan item"""
  5.     query: str
  6.     result: str
  7. class ExecutedPlanEvent(Event):
  8.     """The result of a query plan"""
  9.     result: str
复制代码
同时因为我们要调用工具,所以我们要规范模型的输出,这个会输入到结构化输出的函数内里,规范大模型的输出,原理应该雷同这里https://blog.csdn.net/2303_77229879/article/details/143441989?spm=1001.2014.3001.5501
  1. from pydantic import BaseModel, Field
  2. from llama_index.core.workflow import Event
  3. class QueryPlanItem(Event):
  4.     """A single step in an execution plan for a RAG system."""
  5.     name: str = Field(description="The name of the tool to use.")
  6.     query: str = Field(description="A natural language search query for a RAG system.")
  7. class QueryPlan(BaseModel):
  8.     """A plan for a RAG system. After running the plan, we should have either enough information to answer the user's original query, or enough information to form a new query plan."""
  9.     items: list[QueryPlanItem] = Field(
  10.         description="A list of the QueryPlanItem objects in the plan."
  11.     )
  12. '''通过astructured传入模型即可得到结构化输出
  13. query_plan = await llm.astructured_predict(
  14.                 QueryPlan,
  15.                 self.planning_prompt,
  16.                 context=context_str,
  17.                 query=query,
  18.             )'''
复制代码
现在我们来界说工作流,一共有如下几个函数

  • 制定计划,或者竣事工作流
  • 执行函数
  • 汇聚结果
  1. class QueryPlanningWorkflow(Workflow):
  2.     #让模型进行规划根据上下文内容和用户输入
  3.     planning_prompt = PromptTemplate(
  4.         "Think step by step. Given an initial query, as well as information about the indexes you can query, return a plan for a RAG system.\n"
  5.         "The plan should be a list of QueryPlanItem objects, where each object contains a query.\n"
  6.         "The result of executing an entire plan should provide a result that is a substantial answer to the initial query, "
  7.         "or enough information to form a new query plan.\n"
  8.         "Sources you can query: {context}\n"
  9.         "Initial query: {query}\n"
  10.         "Plan:"
  11.     )
  12.     #根据目前结果,结束还是举行执行任务
  13.     decision_prompt = PromptTemplate(
  14.         "Given the following information, return a final response that satisfies the original query, or return 'PLAN' if you need to continue planning.\n"
  15.         "Original query: {query}\n"
  16.         "Current results: {results}\n"
  17.     )
  18.     @step
  19.     async def planning_step(
  20.         self, ctx: Context, ev: StartEvent | ExecutedPlanEvent
  21.     ) -> QueryPlanItem | StopEvent:
  22.         if isinstance(ev, StartEvent):
  23.             # stratevent的时候才会进行Initially, we need to plan
  24.             query = ev.get("query")
  25.             tools = ev.get("tools")
  26.             await ctx.set("tools", {t.metadata.name: t for t in tools})
  27.             await ctx.set("original_query", query)
  28.             # 将工具的信息作为上下文导入,使得模型看见工具
  29.             context_str = "\n".join(
  30.                 [
  31.                     f"{i+1}. {tool.metadata.name}: {tool.metadata.description}"
  32.                     for i, tool in enumerate(tools)
  33.                 ]
  34.             )
  35.             await ctx.set("context", context_str)
  36.             #planning_prompt导入,要求返回列表,列表里面是使用的工具和查询的自然语言
  37.             #这里有需要也可以吧大模型设置成类变量,我这里直接改成上面定义的全局变量了
  38.             query_plan = await llm.astructured_predict(
  39.                 QueryPlan,
  40.                 self.planning_prompt,
  41.                 context=context_str,
  42.                 query=query,
  43.             )
  44.             ctx.write_event_to_stream(
  45.                 Event(msg=f"Planning step: {query_plan}")
  46.             )
  47.             num_items = len(query_plan.items)
  48.             await ctx.set("num_items", num_items)
  49.             #返回事件
  50.             for item in query_plan.items:
  51.                 ctx.send_event(item)#将事件发送到工作流中的特定步骤。如果 step 为 None,则事件将发送给所有接收者,由它们决定是否丢弃不需要的事件。
  52.         else:
  53.             #ExecutedPlanEvent时会执行
  54.             query = await ctx.get("original_query")
  55.             current_results_str = ev.result
  56.             ##根据目前结果,结束还是举行执行任务
  57.             decision = await llm.apredict(
  58.                 self.decision_prompt,
  59.                 query=query,
  60.                 results=current_results_str,
  61.             )
  62.             # 简单的匹配,如果里面有plan就继续规划,否则就结束,疑惑:如果模型输出不需要plan,怎么结束?
  63.             if "PLAN" in decision:
  64.                 context_str = await ctx.get("context")
  65.                 query_plan = await llm.astructured_predict(
  66.                     QueryPlan,
  67.                     self.planning_prompt,
  68.                     context=context_str,
  69.                     query=query,
  70.                 )
  71.                 ctx.write_event_to_stream(
  72.                     Event(msg=f"Re-Planning step: {query_plan}")
  73.                 )
  74.                 num_items = len(query_plan.items)
  75.                 await ctx.set("num_items", num_items)
  76.                 for item in query_plan.items:
  77.                     ctx.send_event(item)
  78.             else:
  79.                 #如果没有plan就结束
  80.                 return StopEvent(result=decision)
  81.     @step(num_workers=4)
  82.     async def execute_item(
  83.         self, ctx: Context, ev: QueryPlanItem
  84.     ) -> QueryPlanItemResult:
  85.         tools = await ctx.get("tools")
  86.         tool = tools[ev.name]
  87.         ctx.write_event_to_stream(
  88.             Event(
  89.                 msg=f"Querying tool {tool.metadata.name} with query: {ev.query}"
  90.             )
  91.         )
  92.         result = await tool.acall(ev.query)#传入的工具定义的方法,直接把语句放到查询引擎中,然后返回结果
  93.         ctx.write_event_to_stream(
  94.             Event(msg=f"Tool {tool.metadata.name} returned: {result}")
  95.         )
  96.         return QueryPlanItemResult(query=ev.query, result=str(result))
  97.     @step
  98.     async def aggregate_results(
  99.         self, ctx: Context, ev: QueryPlanItemResult
  100.     ) -> ExecutedPlanEvent:
  101.         num_items = await ctx.get("num_items")
  102.         # 收集全部的事件才会结束,否则会一直等待
  103.         results = ctx.collect_events(ev, [QueryPlanItemResult] * num_items)
  104.         if results is None:
  105.             return
  106.         #汇总结果
  107.         aggregated_result = "\n------\n".join(
  108.             [
  109.                 f"{i+1}. {result.query}: {result.result}"
  110.                 for i, result in enumerate(results)
  111.             ]
  112.         )
  113.         return ExecutedPlanEvent(result=aggregated_result)
复制代码
使用glm-4可能出现的问题,我调用api接口一直返回的格式不精确,可能是 planning_prompt的的第三行只说返回一个query,这里改成a query and a name应该就能返回精确的格式了
按理来说prompt的问题的话qwen也会有同样的错误,但是虽然也有发生,重复两次也总是能执行
三、导入数据

使用llamaparse解析PDF文件,有一个文件夹,内里都是pdf,这里可以任意放几篇论文进去,然后可以通过循环界说不同的引擎,这里我只放了一个,所以名称就没修改,只有一个
  1. # 在这里,我们用于加载和解析文档,调用LlamaParseapi解析文件
  2. from llama_parse import LlamaParse
  3. api_key=''#去llama-cloud注册有免费额度的
  4. parser = LlamaParse(api_key=api_key,fast_mode=True)
  5. from llama_index.core import (
  6.     VectorStoreIndex,
  7.     StorageContext,
  8.     load_index_from_storage,
  9. )
  10. from llama_index.core.tools import QueryEngineTool
  11. folder = "./data_pdf/"
  12. files = os.listdir(folder)
  13. query_engine_tools = []
  14. for file in files:
  15.     #取名字第一个字符
  16.     number = file.split(" - ")[0]#根据论文名字的分隔符确定,防止文章太长
  17.     index_persist_path = f"./storage/paper-{number}/"
  18.     #创建储存路径,如果存在就加载,不存在就创建
  19.     if os.path.exists(index_persist_path):
  20.         storage_context = StorageContext.from_defaults(
  21.             persist_dir=index_persist_path
  22.         )
  23.         index = load_index_from_storage(storage_context)
  24.     else:
  25.         documents = await parser.aload_data(folder + file)
  26.         index = VectorStoreIndex.from_documents(documents)
  27.         index.storage_context.persist(index_persist_path)
  28.     #这里需要输入llm,不然会自动调用openai的大模型
  29.     engine = index.as_query_engine(llm=llm)
  30.     query_engine_tools.append(
  31.         QueryEngineTool.from_defaults(
  32.             engine,
  33.             name=f"TimeSeriesAnalysis",
  34.             description=f"This is a paper on time series analysis, which includes information on time series forecasting and attention mechanisms."
  35.         )
  36.     )
复制代码

如果tool名称太长或者,描述不精确都有可能会报错,返回格式不精确这种,好比pydanitc的报错或者ctx获取不到items(basemodel那个子类界说的名称)
四、工作流可视化

  1. from llama_index.utils.workflow import draw_all_possible_flows
  2. draw_all_possible_flows(
  3.     QueryPlanningWorkflow, filename="twwww.html"
  4. )
复制代码


五、输入工具,输出结果

这里是jupterbook内里操作,如果要在py内里运行需要界说一个main函数
  1. #测试工作流
  2. workflow = QueryPlanningWorkflow(verbose=False, timeout=120)
  3. # run the workflow
  4. handler = workflow.run(
  5.     query="什么是时间序列预测",
  6.     tools=query_engine_tools,
  7. )
  8. #流式处理
  9. async for event in handler.stream_events():
  10.     if hasattr(event, "msg"):
  11.         print(event.msg)
  12. result = await handler
复制代码
1.qwen0.5b输出
进行了多次尝试,输出结果都不是很抱负哈,prompt内里又是英语又是中文的,有点为难这个0.5b的大模型了,第一次答的还算沾点边,第二次不知道为啥干翻译去了,然后这个任务的名字他也常常堕落,返回错误的格式,就得不停重试和调解prompt

2.glm-4输出结果
glm这里输出就显着稳固多了,照旧需要调解prompt,而且这里没有加入重试的尝试,一般来说加上可以减少报错,这个prompt也可以调解调解,改成官方做functioncall的情势。



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

道家人

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表