llama-index做一个简单查询工作流(rag)
目次一、导入本地大模型、embedding
二、workflow
三、导入数据
四、工作流可视化
五、输入工具,输出结果
一、导入本地大模型、embedding
代码如下(示例):导入的千问的0.5b大模型
或者使用api接口
from llama_index.llms.zhipuai import ZhipuAI
llms = ZhipuAI(model="glm-4")#api要设置 from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.huggingface import HuggingFaceLLM
from llama_index.core import Settings
llm=HuggingFaceLLM(
model_name="LLM_model/Qwen2.5-0.5B-Instruct",
tokenizer_name="LLM_model/Qwen2.5-0.5B-Instruct",
context_window=30000,
max_new_tokens=2000,
generate_kwargs={"temperature": 0.7, "top_k": 50, "top_p": 0.95},
device_map="auto")
# 设置embedding模型,后面进行向量库储存的时候需要用到
# 用魔搭下载,更快,改成本身的路径
modelscope download --model AI-ModelScope/bge-large-zh-v1.5
--local_dir LLM/LLM_model/embeddding-bge-large-zh-v1.5
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core import Settings
Settings.embed_model =HuggingFaceEmbedding(
model_name="LLM_model/embeddding-bge-large-zh-v1.5",trust_remote_code=True)
二、workflow
工作流应该是这样:
制定计划、进行查询、汇聚答案,决定是否再继续查询
故我们可以界说两个变乱
#事件
from llama_index.core.workflow import Event
class QueryPlanItemResult(Event):
"""The result of a query plan item"""
query: str
result: str
class ExecutedPlanEvent(Event):
"""The result of a query plan"""
result: str 同时因为我们要调用工具,所以我们要规范模型的输出,这个会输入到结构化输出的函数内里,规范大模型的输出,原理应该雷同这里https://blog.csdn.net/2303_77229879/article/details/143441989?spm=1001.2014.3001.5501
from pydantic import BaseModel, Field
from llama_index.core.workflow import Event
class QueryPlanItem(Event):
"""A single step in an execution plan for a RAG system."""
name: str = Field(description="The name of the tool to use.")
query: str = Field(description="A natural language search query for a RAG system.")
class QueryPlan(BaseModel):
"""A plan for a RAG system. After running the plan, we should have either enough information to answer the user's original query, or enough information to form a new query plan."""
items: list = Field(
description="A list of the QueryPlanItem objects in the plan."
)
'''通过astructured传入模型即可得到结构化输出
query_plan = await llm.astructured_predict(
QueryPlan,
self.planning_prompt,
context=context_str,
query=query,
)''' 现在我们来界说工作流,一共有如下几个函数
[*]制定计划,或者竣事工作流
[*]执行函数
[*]汇聚结果
class QueryPlanningWorkflow(Workflow):
#让模型进行规划根据上下文内容和用户输入
planning_prompt = PromptTemplate(
"Think step by step. Given an initial query, as well as information about the indexes you can query, return a plan for a RAG system.\n"
"The plan should be a list of QueryPlanItem objects, where each object contains a query.\n"
"The result of executing an entire plan should provide a result that is a substantial answer to the initial query, "
"or enough information to form a new query plan.\n"
"Sources you can query: {context}\n"
"Initial query: {query}\n"
"Plan:"
)
#根据目前结果,结束还是举行执行任务
decision_prompt = PromptTemplate(
"Given the following information, return a final response that satisfies the original query, or return 'PLAN' if you need to continue planning.\n"
"Original query: {query}\n"
"Current results: {results}\n"
)
@step
async def planning_step(
self, ctx: Context, ev: StartEvent | ExecutedPlanEvent
) -> QueryPlanItem | StopEvent:
if isinstance(ev, StartEvent):
# stratevent的时候才会进行Initially, we need to plan
query = ev.get("query")
tools = ev.get("tools")
await ctx.set("tools", {t.metadata.name: t for t in tools})
await ctx.set("original_query", query)
# 将工具的信息作为上下文导入,使得模型看见工具
context_str = "\n".join(
[
f"{i+1}. {tool.metadata.name}: {tool.metadata.description}"
for i, tool in enumerate(tools)
]
)
await ctx.set("context", context_str)
#planning_prompt导入,要求返回列表,列表里面是使用的工具和查询的自然语言
#这里有需要也可以吧大模型设置成类变量,我这里直接改成上面定义的全局变量了
query_plan = await llm.astructured_predict(
QueryPlan,
self.planning_prompt,
context=context_str,
query=query,
)
ctx.write_event_to_stream(
Event(msg=f"Planning step: {query_plan}")
)
num_items = len(query_plan.items)
await ctx.set("num_items", num_items)
#返回事件
for item in query_plan.items:
ctx.send_event(item)#将事件发送到工作流中的特定步骤。如果 step 为 None,则事件将发送给所有接收者,由它们决定是否丢弃不需要的事件。
else:
#ExecutedPlanEvent时会执行
query = await ctx.get("original_query")
current_results_str = ev.result
##根据目前结果,结束还是举行执行任务
decision = await llm.apredict(
self.decision_prompt,
query=query,
results=current_results_str,
)
# 简单的匹配,如果里面有plan就继续规划,否则就结束,疑惑:如果模型输出不需要plan,怎么结束?
if "PLAN" in decision:
context_str = await ctx.get("context")
query_plan = await llm.astructured_predict(
QueryPlan,
self.planning_prompt,
context=context_str,
query=query,
)
ctx.write_event_to_stream(
Event(msg=f"Re-Planning step: {query_plan}")
)
num_items = len(query_plan.items)
await ctx.set("num_items", num_items)
for item in query_plan.items:
ctx.send_event(item)
else:
#如果没有plan就结束
return StopEvent(result=decision)
@step(num_workers=4)
async def execute_item(
self, ctx: Context, ev: QueryPlanItem
) -> QueryPlanItemResult:
tools = await ctx.get("tools")
tool = tools
ctx.write_event_to_stream(
Event(
msg=f"Querying tool {tool.metadata.name} with query: {ev.query}"
)
)
result = await tool.acall(ev.query)#传入的工具定义的方法,直接把语句放到查询引擎中,然后返回结果
ctx.write_event_to_stream(
Event(msg=f"Tool {tool.metadata.name} returned: {result}")
)
return QueryPlanItemResult(query=ev.query, result=str(result))
@step
async def aggregate_results(
self, ctx: Context, ev: QueryPlanItemResult
) -> ExecutedPlanEvent:
num_items = await ctx.get("num_items")
# 收集全部的事件才会结束,否则会一直等待
results = ctx.collect_events(ev, * num_items)
if results is None:
return
#汇总结果
aggregated_result = "\n------\n".join(
[
f"{i+1}. {result.query}: {result.result}"
for i, result in enumerate(results)
]
)
return ExecutedPlanEvent(result=aggregated_result) 使用glm-4可能出现的问题,我调用api接口一直返回的格式不精确,可能是 planning_prompt的的第三行只说返回一个query,这里改成a query and a name应该就能返回精确的格式了
按理来说prompt的问题的话qwen也会有同样的错误,但是虽然也有发生,重复两次也总是能执行
三、导入数据
使用llamaparse解析PDF文件,有一个文件夹,内里都是pdf,这里可以任意放几篇论文进去,然后可以通过循环界说不同的引擎,这里我只放了一个,所以名称就没修改,只有一个
# 在这里,我们用于加载和解析文档,调用LlamaParseapi解析文件
from llama_parse import LlamaParse
api_key=''#去llama-cloud注册有免费额度的
parser = LlamaParse(api_key=api_key,fast_mode=True)
from llama_index.core import (
VectorStoreIndex,
StorageContext,
load_index_from_storage,
)
from llama_index.core.tools import QueryEngineTool
folder = "./data_pdf/"
files = os.listdir(folder)
query_engine_tools = []
for file in files:
#取名字第一个字符
number = file.split(" - ")#根据论文名字的分隔符确定,防止文章太长
index_persist_path = f"./storage/paper-{number}/"
#创建储存路径,如果存在就加载,不存在就创建
if os.path.exists(index_persist_path):
storage_context = StorageContext.from_defaults(
persist_dir=index_persist_path
)
index = load_index_from_storage(storage_context)
else:
documents = await parser.aload_data(folder + file)
index = VectorStoreIndex.from_documents(documents)
index.storage_context.persist(index_persist_path)
#这里需要输入llm,不然会自动调用openai的大模型
engine = index.as_query_engine(llm=llm)
query_engine_tools.append(
QueryEngineTool.from_defaults(
engine,
name=f"TimeSeriesAnalysis",
description=f"This is a paper on time series analysis, which includes information on time series forecasting and attention mechanisms."
)
)
如果tool名称太长或者,描述不精确都有可能会报错,返回格式不精确这种,好比pydanitc的报错或者ctx获取不到items(basemodel那个子类界说的名称)
四、工作流可视化
from llama_index.utils.workflow import draw_all_possible_flows
draw_all_possible_flows(
QueryPlanningWorkflow, filename="twwww.html"
) https://i-blog.csdnimg.cn/direct/61cee1948274471f9a80c83b5211cda3.png
五、输入工具,输出结果
这里是jupterbook内里操作,如果要在py内里运行需要界说一个main函数
#测试工作流
workflow = QueryPlanningWorkflow(verbose=False, timeout=120)
# run the workflow
handler = workflow.run(
query="什么是时间序列预测",
tools=query_engine_tools,
)
#流式处理
async for event in handler.stream_events():
if hasattr(event, "msg"):
print(event.msg)
result = await handler 1.qwen0.5b输出
进行了多次尝试,输出结果都不是很抱负哈,prompt内里又是英语又是中文的,有点为难这个0.5b的大模型了,第一次答的还算沾点边,第二次不知道为啥干翻译去了,然后这个任务的名字他也常常堕落,返回错误的格式,就得不停重试和调解prompt
https://i-blog.csdnimg.cn/direct/d367bf0882f846f0bcdf74c00c8c45a1.jpeghttps://i-blog.csdnimg.cn/direct/5040db2e3f3247f58b8f1bbc48298108.png
2.glm-4输出结果
glm这里输出就显着稳固多了,照旧需要调解prompt,而且这里没有加入重试的尝试,一般来说加上可以减少报错,这个prompt也可以调解调解,改成官方做functioncall的情势。
https://i-blog.csdnimg.cn/direct/772307a4e1ef48c795b5b256b10883c8.png
https://i-blog.csdnimg.cn/direct/cff61f61e2784a62864913cb7f370c06.png
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]