道家人 发表于 2024-11-12 22:36:16

llama-index做一个简单查询工作流(rag)

目次
一、导入本地大模型、embedding
二、workflow
三、导入数据
四、工作流可视化
五、输入工具,输出结果
一、导入本地大模型、embedding

 代码如下(示例):导入的千问的0.5b大模型
或者使用api接口
from llama_index.llms.zhipuai import ZhipuAI
llms = ZhipuAI(model="glm-4")#api要设置 from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.huggingface import HuggingFaceLLM
from llama_index.core import Settings
llm=HuggingFaceLLM(
    model_name="LLM_model/Qwen2.5-0.5B-Instruct",
    tokenizer_name="LLM_model/Qwen2.5-0.5B-Instruct",
    context_window=30000,
    max_new_tokens=2000,
    generate_kwargs={"temperature": 0.7, "top_k": 50, "top_p": 0.95},
    device_map="auto")
   # 设置embedding模型,后面进行向量库储存的时候需要用到
# 用魔搭下载,更快,改成本身的路径
modelscope download --model AI-ModelScope/bge-large-zh-v1.5
--local_dir LLM/LLM_model/embeddding-bge-large-zh-v1.5
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core import Settings
Settings.embed_model =HuggingFaceEmbedding(
    model_name="LLM_model/embeddding-bge-large-zh-v1.5",trust_remote_code=True)
二、workflow

工作流应该是这样:
制定计划、进行查询、汇聚答案,决定是否再继续查询
故我们可以界说两个变乱
#事件
from llama_index.core.workflow import Event
class QueryPlanItemResult(Event):
    """The result of a query plan item"""

    query: str
    result: str

class ExecutedPlanEvent(Event):
    """The result of a query plan"""

    result: str 同时因为我们要调用工具,所以我们要规范模型的输出,这个会输入到结构化输出的函数内里,规范大模型的输出,原理应该雷同这里https://blog.csdn.net/2303_77229879/article/details/143441989?spm=1001.2014.3001.5501
from pydantic import BaseModel, Field
from llama_index.core.workflow import Event
class QueryPlanItem(Event):
    """A single step in an execution plan for a RAG system."""
    name: str = Field(description="The name of the tool to use.")
    query: str = Field(description="A natural language search query for a RAG system.")

class QueryPlan(BaseModel):
    """A plan for a RAG system. After running the plan, we should have either enough information to answer the user's original query, or enough information to form a new query plan."""
    items: list = Field(
      description="A list of the QueryPlanItem objects in the plan."
    )
'''通过astructured传入模型即可得到结构化输出
query_plan = await llm.astructured_predict(
                QueryPlan,
                self.planning_prompt,
                context=context_str,
                query=query,
            )''' 现在我们来界说工作流,一共有如下几个函数

[*]制定计划,或者竣事工作流
[*]执行函数
[*]汇聚结果
class QueryPlanningWorkflow(Workflow):
    #让模型进行规划根据上下文内容和用户输入
    planning_prompt = PromptTemplate(
      "Think step by step. Given an initial query, as well as information about the indexes you can query, return a plan for a RAG system.\n"
      "The plan should be a list of QueryPlanItem objects, where each object contains a query.\n"
      "The result of executing an entire plan should provide a result that is a substantial answer to the initial query, "
      "or enough information to form a new query plan.\n"
      "Sources you can query: {context}\n"
      "Initial query: {query}\n"
      "Plan:"
    )
    #根据目前结果,结束还是举行执行任务
    decision_prompt = PromptTemplate(
      "Given the following information, return a final response that satisfies the original query, or return 'PLAN' if you need to continue planning.\n"
      "Original query: {query}\n"
      "Current results: {results}\n"
    )

    @step
    async def planning_step(
      self, ctx: Context, ev: StartEvent | ExecutedPlanEvent
    ) -> QueryPlanItem | StopEvent:
      if isinstance(ev, StartEvent):
            # stratevent的时候才会进行Initially, we need to plan
            query = ev.get("query")
            tools = ev.get("tools")
            await ctx.set("tools", {t.metadata.name: t for t in tools})
            await ctx.set("original_query", query)
            # 将工具的信息作为上下文导入,使得模型看见工具
            context_str = "\n".join(
                [
                  f"{i+1}. {tool.metadata.name}: {tool.metadata.description}"
                  for i, tool in enumerate(tools)
                ]
            )
            await ctx.set("context", context_str)
            #planning_prompt导入,要求返回列表,列表里面是使用的工具和查询的自然语言
            #这里有需要也可以吧大模型设置成类变量,我这里直接改成上面定义的全局变量了
            query_plan = await llm.astructured_predict(
                QueryPlan,
                self.planning_prompt,
                context=context_str,
                query=query,
            )
            ctx.write_event_to_stream(
                Event(msg=f"Planning step: {query_plan}")
            )
            num_items = len(query_plan.items)
            await ctx.set("num_items", num_items)
            #返回事件
            for item in query_plan.items:
                ctx.send_event(item)#将事件发送到工作流中的特定步骤。如果 step 为 None,则事件将发送给所有接收者,由它们决定是否丢弃不需要的事件。
      else:
            #ExecutedPlanEvent时会执行
            query = await ctx.get("original_query")
            current_results_str = ev.result
            ##根据目前结果,结束还是举行执行任务
            decision = await llm.apredict(
                self.decision_prompt,
                query=query,
                results=current_results_str,
            )

            # 简单的匹配,如果里面有plan就继续规划,否则就结束,疑惑:如果模型输出不需要plan,怎么结束?
            if "PLAN" in decision:
                context_str = await ctx.get("context")
                query_plan = await llm.astructured_predict(
                  QueryPlan,
                  self.planning_prompt,
                  context=context_str,
                  query=query,
                )
                ctx.write_event_to_stream(
                  Event(msg=f"Re-Planning step: {query_plan}")
                )
                num_items = len(query_plan.items)
                await ctx.set("num_items", num_items)
                for item in query_plan.items:
                  ctx.send_event(item)
            else:
                #如果没有plan就结束
                return StopEvent(result=decision)

    @step(num_workers=4)
    async def execute_item(
      self, ctx: Context, ev: QueryPlanItem
    ) -> QueryPlanItemResult:
      tools = await ctx.get("tools")
      tool = tools

      ctx.write_event_to_stream(
            Event(
                msg=f"Querying tool {tool.metadata.name} with query: {ev.query}"
            )
      )
      result = await tool.acall(ev.query)#传入的工具定义的方法,直接把语句放到查询引擎中,然后返回结果
      ctx.write_event_to_stream(
            Event(msg=f"Tool {tool.metadata.name} returned: {result}")
      )
      return QueryPlanItemResult(query=ev.query, result=str(result))

    @step
    async def aggregate_results(
      self, ctx: Context, ev: QueryPlanItemResult
    ) -> ExecutedPlanEvent:
      num_items = await ctx.get("num_items")
      # 收集全部的事件才会结束,否则会一直等待
      results = ctx.collect_events(ev, * num_items)
      if results is None:
            return
      #汇总结果
      aggregated_result = "\n------\n".join(
            [
                f"{i+1}. {result.query}: {result.result}"
                for i, result in enumerate(results)
            ]
      )
      return ExecutedPlanEvent(result=aggregated_result) 使用glm-4可能出现的问题,我调用api接口一直返回的格式不精确,可能是 planning_prompt的的第三行只说返回一个query,这里改成a query and a name应该就能返回精确的格式了
按理来说prompt的问题的话qwen也会有同样的错误,但是虽然也有发生,重复两次也总是能执行
三、导入数据

使用llamaparse解析PDF文件,有一个文件夹,内里都是pdf,这里可以任意放几篇论文进去,然后可以通过循环界说不同的引擎,这里我只放了一个,所以名称就没修改,只有一个
# 在这里,我们用于加载和解析文档,调用LlamaParseapi解析文件
from llama_parse import LlamaParse
api_key=''#去llama-cloud注册有免费额度的
parser = LlamaParse(api_key=api_key,fast_mode=True)

from llama_index.core import (
    VectorStoreIndex,
    StorageContext,
    load_index_from_storage,
)
from llama_index.core.tools import QueryEngineTool

folder = "./data_pdf/"
files = os.listdir(folder)

query_engine_tools = []
for file in files:
    #取名字第一个字符
    number = file.split(" - ")#根据论文名字的分隔符确定,防止文章太长
    index_persist_path = f"./storage/paper-{number}/"
    #创建储存路径,如果存在就加载,不存在就创建
    if os.path.exists(index_persist_path):
      storage_context = StorageContext.from_defaults(
            persist_dir=index_persist_path
      )
      index = load_index_from_storage(storage_context)
    else:
      documents = await parser.aload_data(folder + file)
      index = VectorStoreIndex.from_documents(documents)
      index.storage_context.persist(index_persist_path)
    #这里需要输入llm,不然会自动调用openai的大模型
    engine = index.as_query_engine(llm=llm)
    query_engine_tools.append(
      QueryEngineTool.from_defaults(
            engine,
            name=f"TimeSeriesAnalysis",
            description=f"This is a paper on time series analysis, which includes information on time series forecasting and attention mechanisms."
      )
    )
如果tool名称太长或者,描述不精确都有可能会报错,返回格式不精确这种,好比pydanitc的报错或者ctx获取不到items(basemodel那个子类界说的名称)
四、工作流可视化

from llama_index.utils.workflow import draw_all_possible_flows
draw_all_possible_flows(
    QueryPlanningWorkflow, filename="twwww.html"
) https://i-blog.csdnimg.cn/direct/61cee1948274471f9a80c83b5211cda3.png

五、输入工具,输出结果

这里是jupterbook内里操作,如果要在py内里运行需要界说一个main函数
#测试工作流
workflow = QueryPlanningWorkflow(verbose=False, timeout=120)
# run the workflow
handler = workflow.run(
    query="什么是时间序列预测",
    tools=query_engine_tools,
)
#流式处理
async for event in handler.stream_events():
    if hasattr(event, "msg"):
      print(event.msg)
result = await handler 1.qwen0.5b输出
进行了多次尝试,输出结果都不是很抱负哈,prompt内里又是英语又是中文的,有点为难这个0.5b的大模型了,第一次答的还算沾点边,第二次不知道为啥干翻译去了,然后这个任务的名字他也常常堕落,返回错误的格式,就得不停重试和调解prompt
https://i-blog.csdnimg.cn/direct/d367bf0882f846f0bcdf74c00c8c45a1.jpeghttps://i-blog.csdnimg.cn/direct/5040db2e3f3247f58b8f1bbc48298108.png
2.glm-4输出结果
glm这里输出就显着稳固多了,照旧需要调解prompt,而且这里没有加入重试的尝试,一般来说加上可以减少报错,这个prompt也可以调解调解,改成官方做functioncall的情势。
https://i-blog.csdnimg.cn/direct/772307a4e1ef48c795b5b256b10883c8.png
https://i-blog.csdnimg.cn/direct/cff61f61e2784a62864913cb7f370c06.png

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: llama-index做一个简单查询工作流(rag)