齐备的出发点是一顿臭骂
上个月,我被领导叫进办公室骂了整整二非常钟。
因由是如许的——我们部门负责维护一套内部知识库体系,内里沉淀了公司近五年的技能文档、故障处置惩罚手册、另有各种规范流程。标题是,这玩意儿除了当摆设,险些没人用。为啥?由于搜索太烂了,关键词匹配的那种,你搜服务器宕机怎么办,它给你返回一堆包罗服务器的文档,真正有效的那篇反而排在第三页。
新同事入职问标题,老员工翻文档找答案,各人宁肯在群里@人问,也不乐意去知识库里查。
然后领导发话了:你不是每天研究什么大模子吗?能不能整个智能问答,让各人直接问标题就能得到答案?
我其时脑筋一热,拍胸脯说没标题。结果第一版上线三天就被骂下来了——用户问我们的MySQL主从切换流程是什么,大模子复兴得头头是道,但内容美满是它自己编的!跟我们公司的现实流程八竿子打不着。
这就是所谓的大模子幻觉标题,我其时对RAG的明确还停顿在把文档丢进去就行的程度,太灵活了。
不外,厥后的故事还算圆满。我花了快要三周时间重构了整个方案,如今这套体系已经成了部门的标配工具,月生动用户从0涨到了200多,领导在季度会上还专门表彰了一回。本日这篇文章,我就把整个踩坑过程原原来本地纪录下来,包罗代码、架构计划、以及那些教科书上不会告诉你的实战细节。
一、RAG到底在办理什么标题
在动手之前,我想先聊聊RAG这个概念,由于很多刚打仗的朋侪轻易搞混。
大模子很强,但它有两个致命缺点:
第一,知识有制止日期。 GPT-4的训练数据制止到某个时间点,它不知道你们公司上周发布的新规范,也不知道你们昨天刚修复的谁人bug是怎么办理的。
第二,会一本端庄地颠三倒四。 当大模子碰到它不知道的标题时,它不会老老实实说我不知道,而是会基于它学过的通用知识,给你编一个看起来很公道但实在是错的答案。这就是所谓的幻觉(Hallucination)。
RAG(Retrieval-Augmented Generation,检索增强天生)的核心思绪实在很简朴:别让大模子靠想象力答题,先帮它把参考资料找出来,让它照着资料复兴。
详细来说分三步:
- 把你的私有文档切成小块,转成向量存起来
- 用户提问时,先根据标题检索出最相干的文档片断
- 把标题和检索到的内容一起喂给大模子,让它基于这些质料天生答案
听起来不复杂对吧?我其时也是这么想的,然后就踩了一堆坑。
二、第一个大坑:文档切分没那么简朴
我最初的方案特别粗暴——用LangChain的RecursiveCharacterTextSplitter,设置chunk_size=500,overlap=50,直接把全部文档切成小块。
代码写起来确实很简朴:- from langchain.text_splitter import RecursiveCharacterTextSplitter
- def naive_split(text):
- 最初的简单切分方案——后来证明这是个坑
- splitter = RecursiveCharacterTextSplitter(
- chunk_size=500,
- chunk_overlap=50,
- separators=[\n\n, \n, 。, !, ?, , ]
- )
- chunks = splitter.split_text(text)
- return chunks
- # 测试一下
- sample_text =
- # MySQL主从切换操作手册
- ## 1. 前置检查
- 在执行主从切换之前,必须完成以下检查:
- - 确认从库同步状态正常(Seconds_Behind_Master = 0)
- - 确认没有正在执行的大事务
- - 通知相关业务方,确认切换时间窗口
- ## 2. 切换步骤
- 2.1 在主库执行只读设置
- SET GLOBAL read_only = 1;
- 2.2 等待从库完全同步
- 在从库执行 SHOW SLAVE STATUS,确认 Seconds_Behind_Master = 0
- 2.3 停止从库复制
- STOP SLAVE;
- RESET SLAVE ALL;
- ## 3. 回滚方案
- 如果切换失败,按以下步骤回滚...
- chunks = naive_split(sample_text)
- for i, chunk in enumerate(chunks):
- print(f Chunk {i+1} )
- print(chunk[:100] + ... if len(chunk) > 100 else chunk)
复制代码 看起来没毛病是吧?但现实用起来标题大了。
有一次用户问:MySQL切换前须要做哪些查抄?体系返回的文档片断是如许的:- 确认没有正在执行的大事务
- - 通知相关业务方,确认切换时间窗口
- ## 2. 切换步骤
- 2.1 在主库执行只读设置
- SET GLOBAL read_only = 1;
复制代码 发现标题了吗?这个片断恰好从查抄步调的中心切开了!第一条查抄项确认从库同步状态正常被切到了上一个chunk里。用户问的是须要做哪些查抄,结果我们给大模子的参考资料里,第一条查抄项就没包罗进去。
核心教导:机器地按字数切分,会打断文档的语义完备性。
厥后我改成了基于语义结构的切分计谋:- import re
- from typing import List, Dict
- class SmartDocumentSplitter:
-
- 语义感知的文档切分器
- 核心思路:尊重文档的原有结构,按标题、段落等语义边界切分
-
-
- def __init__(self, max_chunk_size=800, min_chunk_size=100):
- self.max_chunk_size = max_chunk_size
- self.min_chunk_size = min_chunk_size
-
- def split_markdown(self, text: str) -> List[Dict]:
-
- 针对Markdown文档的切分
- 保持标题层级结构,每个chunk都带上完整的上下文路径
-
- chunks = []
- current_headers = {1: , 2: , 3: } # 记录当前的标题层级
-
- # 按行处理,识别标题和内容
- lines = text.split('\n')
- current_content = []
-
- for line in lines:
- # 检测Markdown标题
- header_match = re.match(r'^(#{1,3})\s+(.+)$', line)
-
- if header_match:
- # 遇到新标题,先保存之前的内容
- if current_content:
- chunk_text = '\n'.join(current_content).strip()
- if len(chunk_text) >= self.min_chunk_size:
- chunks.append({
- 'content': chunk_text,
- 'headers': dict(current_headers),
- 'context_path': self._build_context_path(current_headers)
- })
- current_content = []
-
- # 更新标题层级
- level = len(header_match.group(1))
- title = header_match.group(2)
- current_headers[level] = title
-
- # 清除下级标题
- for l in range(level + 1, 4):
- current_headers[l] =
-
- current_content.append(line)
- else:
- current_content.append(line)
-
- # 如果当前内容超过最大长度,强制切分(但尽量在段落边界)
- content_so_far = '\n'.join(current_content)
- if len(content_so_far) > self.max_chunk_size:
- chunk_text = content_so_far.strip()
- chunks.append({
- 'content': chunk_text,
- 'headers': dict(current_headers),
- 'context_path': self._build_context_path(current_headers)
- })
- current_content = []
-
- # 别忘了最后一段
- if current_content:
- chunk_text = '\n'.join(current_content).strip()
- if len(chunk_text) >= self.min_chunk_size:
- chunks.append({
- 'content': chunk_text,
- 'headers': dict(current_headers),
- 'context_path': self._build_context_path(current_headers)
- })
-
- return chunks
-
- def _build_context_path(self, headers: Dict) -> str:
- 构建层级路径,比如:MySQL主从切换 > 前置检查
- path_parts = [h for h in [headers[1], headers[2], headers[3]] if h]
- return ' > '.join(path_parts) if path_parts else '未分类'
-
- def enrich_chunk_with_context(self, chunk: Dict) -> str:
-
- 关键技巧:给每个chunk加上上下文前缀
- 这样即使单独看这个片段,也能知道它属于哪个章节
-
- context = f[文档路径:{chunk['context_path']}]\n\n
- return context + chunk['content']
- # 实际使用示例
- splitter = SmartDocumentSplitter(max_chunk_size=800)
- chunks = splitter.split_markdown(sample_text)
- print(f切分后共 {len(chunks)} 个片段\n)
- for i, chunk in enumerate(chunks):
- print(f=== Chunk {i+1} ===)
- print(f路径:{chunk['context_path']})
- print(f内容预览:{chunk['content'][:150]}...)
- print()
复制代码 如许切出来的结果就很多多少了。每个chunk开头都会带上它的位置信息,大模子在复兴时能更正确地明确这段内容的上下文。
不外说实话,这个方案也不是万能的。对于那些格式不规范的老文档(没有清晰的标题结构),切分结果依然一样平常。厥后我又针对差别范例的文档做了差异化处置惩罚,这个我们背面再说。
三、第二个大坑:向量检索的语义鸿沟
办理了切分标题,下一步就是向量化和检索了。我用的是开源的BGE模子做Embedding,用Milvus做向量数据库。
第一版的检索代码很直白:- from sentence_transformers import SentenceTransformer
- from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
- import numpy as np
- class VectorStore:
- 向量存储和检索
-
- def __init__(self, model_name='BAAI/bge-base-zh-v1.5'):
- # 加载Embedding模型
- self.model = SentenceTransformer(model_name)
- self.dim = 768 # BGE base模型的向量维度
-
- # 连接Milvus
- connections.connect(default, host=localhost, port=19530)
-
- def create_collection(self, collection_name: str):
- 创建集合
- if utility.has_collection(collection_name):
- utility.drop_collection(collection_name)
-
- fields = [
- FieldSchema(name=id, dtype=DataType.INT64, is_primary=True, auto_id=True),
- FieldSchema(name=content, dtype=DataType.VARCHAR, max_length=4096),
- FieldSchema(name=context_path, dtype=DataType.VARCHAR, max_length=512),
- FieldSchema(name=embedding, dtype=DataType.FLOAT_VECTOR, dim=self.dim)
- ]
- schema = CollectionSchema(fields, description=知识库文档)
- collection = Collection(collection_name, schema)
-
- # 创建索引
- index_params = {
- metric_type: COSINE,
- index_type: IVF_FLAT,
- params: {nlist: 128}
- }
- collection.create_index(embedding, index_params)
- return collection
-
- def insert_documents(self, collection_name: str, chunks: list):
- 插入文档
- collection = Collection(collection_name)
-
- contents = [chunk['content'] for chunk in chunks]
- context_paths = [chunk['context_path'] for chunk in chunks]
-
- # 批量生成Embedding
- embeddings = self.model.encode(contents, normalize_embeddings=True)
-
- collection.insert([contents, context_paths, embeddings.tolist()])
- collection.flush()
- print(f成功插入 {len(chunks)} 条文档)
-
- def search(self, collection_name: str, query: str, top_k: int = 5):
- 基础检索
- collection = Collection(collection_name)
- collection.load()
-
- # 生成查询向量
- query_embedding = self.model.encode([query], normalize_embeddings=True)
-
- results = collection.search(
- data=query_embedding.tolist(),
- anns_field=embedding,
- param={metric_type: COSINE, params: {nprobe: 16}},
- limit=top_k,
- output_fields=[content, context_path]
- )
-
- return results[0]
复制代码 根本功能是没标题的。但现实跑起来,我发现了一个让人抓狂的征象——用户的口语化提问和文档的正式表述之间存在巨大的语义鸿沟。
举个例子:
- 用户问:数据库挂了怎么办
- 文档标题是:MySQL服务非常规复操纵手册
这两个在语义上是相干的,但向量相似度大概并不高。由于用户说的挂了和文档里的非常,用词差异很大。
更坑的是,偶尔候检索出的Top 5结果里,真正相干的那篇大概只排在第3或第4位,但前两名是一些看起来相干但现实上文不对题的内容。假如我只取Top 3喂给大模子,大概就遗漏了最关键的信息。
厥后我接纳了一个两阶段检索的计谋:先用向量检索做粗筛,再用重排序模子做精排。- from transformers import AutoModelForSequenceClassification, AutoTokenizer
- import torch
- class EnhancedRetriever:
- 增强版检索器:向量检索 + 重排序
-
- def __init__(self, vector_store: VectorStore):
- self.vector_store = vector_store
-
- # 加载重排序模型(BGE Reranker效果不错)
- self.reranker_tokenizer = AutoTokenizer.from_pretrained('BAAI/bge-reranker-base')
- self.reranker_model = AutoModelForSequenceClassification.from_pretrained('BAAI/bge-reranker-base')
- self.reranker_model.eval()
-
- def retrieve_with_rerank(self, collection_name: str, query: str,
- initial_top_k: int = 20, final_top_k: int = 5):
-
- 两阶段检索:
- 1. 向量检索召回 initial_top_k 个候选
- 2. 用重排序模型精排,返回 final_top_k 个结果
-
- # 第一阶段:向量检索(召回更多候选)
- initial_results = self.vector_store.search(collection_name, query, top_k=initial_top_k)
-
- if not initial_results:
- return []
-
- # 准备重排序
- candidates = []
- for hit in initial_results:
- candidates.append({
- 'content': hit.entity.get('content'),
- 'context_path': hit.entity.get('context_path'),
- 'vector_score': hit.score # 保留向量检索得分,用于调试
- })
-
- # 第二阶段:重排序
- rerank_scores = self._compute_rerank_scores(query, [c['content'] for c in candidates])
-
- for i, score in enumerate(rerank_scores):
- candidates[i]['rerank_score'] = score
-
- # 按重排序分数排序
- candidates.sort(key=lambda x: x['rerank_score'], reverse=True)
-
- return candidates[:final_top_k]
-
- def _compute_rerank_scores(self, query: str, documents: list) -> list:
- 计算query和每个文档的相关性分数
- scores = []
-
- with torch.no_grad():
- for doc in documents:
- # Reranker的输入格式是 [query, document]
- inputs = self.reranker_tokenizer(
- [[query, doc]],
- padding=True,
- truncation=True,
- max_length=512,
- return_tensors='pt'
- )
- outputs = self.reranker_model(**inputs)
- score = outputs.logits.squeeze().item()
- scores.append(score)
-
- return scores
-
- def retrieve_with_query_expansion(self, collection_name: str, query: str,
- llm_client, top_k: int = 5):
-
- 进阶技巧:查询扩展
- 用大模型改写用户问题,生成多个变体,再合并检索结果
-
- # 让大模型帮我们扩展查询
- expansion_prompt = f请将下面这个问题改写成3个不同的表达方式,保持意思相同但用词不同。
- 每行输出一个改写结果,不要序号,不要其他解释。
- 原问题:{query}
-
- expanded_queries = llm_client.generate(expansion_prompt).strip().split('\n')
- expanded_queries = [q.strip() for q in expanded_queries if q.strip()]
-
- # 加上原始查询
- all_queries = [query] + expanded_queries[:3] # 最多取3个扩展查询
-
- print(f扩展后的查询:{all_queries}) # 调试用
-
- # 对每个查询分别检索
- all_candidates = {}
- for q in all_queries:
- results = self.vector_store.search(collection_name, q, top_k=10)
- for hit in results:
- content = hit.entity.get('content')
- if content not in all_candidates:
- all_candidates[content] = {
- 'content': content,
- 'context_path': hit.entity.get('context_path'),
- 'best_score': hit.score,
- 'hit_count': 1
- }
- else:
- # 被多个查询命中的文档,增加权重
- all_candidates[content]['hit_count'] += 1
- all_candidates[content]['best_score'] = max(
- all_candidates[content]['best_score'],
- hit.score
- )
-
- # 综合评分:命中次数 * 最高得分
- candidates = list(all_candidates.values())
- for c in candidates:
- c['combined_score'] = c['hit_count'] * c['best_score']
-
- candidates.sort(key=lambda x: x['combined_score'], reverse=True)
- return candidates[:top_k]
复制代码 查询扩展这招特别好用。好比用户问数据库挂了怎么办,大模子大概会扩展成:
- MySQL服务故障怎样处置惩罚
- 数据库无法毗连的办理方案
- 数据库宕机规复步调
这几个查询一起检索,能覆盖更多的相干文档。
四、第三个大坑:Prompt工程的门道比想象中深
检索的标题办理了,接下来就是把检索到的内容和用户标题一起喂给大模子了。这一步我本以为最简朴,没想到也踩了不少坑。
最初的Prompt特别淳厚:- def build_naive_prompt(query: str, context_docs: list) -> str:
- 最初的简单Prompt——后来证明太天真了
- context = \n\n.join([doc['content'] for doc in context_docs])
-
- prompt = f根据以下参考资料回答用户问题。
- 参考资料:
- {context}
- 用户问题:{query}
- 请回答:
-
- return prompt
复制代码 这个Prompt有几个严峻标题:
标题一:大模子不知道什么时间该说不知道。 当参考资料里确实没有答案时,它照旧会编一个出来。
标题二:没有引导大模子阐明信息泉源。 用户看到答案,不知道是从哪篇文档里来的,无法追溯和验证。
标题三:对于复杂标题,复兴的结构不敷清晰。
厥后迭代了很多版,终极稳固下来的Prompt是如许的:- def build_rag_prompt(query: str, context_docs: list,
- include_sources: bool = True) -> str:
-
- 生产环境使用的Prompt模板
- 关键设计:明确角色定位、限制回答范围、要求标注来源
-
-
- # 格式化上下文,每段都标注来源
- context_parts = []
- for i, doc in enumerate(context_docs, 1):
- source = doc.get('context_path', '未知来源')
- context_parts.append(f【资料{i},来源:{source}】\n{doc['content']})
-
- context = \n\n\n\n.join(context_parts)
-
- prompt = f你是一个企业内部知识库助手,专门帮助员工查找和理解公司内部文档。
- ## 你的工作准则
- 1. **只根据提供的参考资料回答问题**,不要使用你自己的知识。
- 2. 如果参考资料中没有相关信息,请明确说根据现有资料,我无法找到关于这个问题的信息,并建议用户联系相关部门或换个关键词搜索。
- 3. 回答时请标注信息来源,格式如【资料1】,方便用户追溯原文。
- 4. 对于操作类问题,请按步骤清晰地列出;对于概念类问题,先给出简明定义再展开解释。
- 5. 如果不同资料中的信息有冲突,请指出差异并说明各自的适用场景。
- ## 参考资料
- {context}
- ## 用户问题
- {query}
- ## 回答要求
- 请根据上述参考资料回答用户问题。记住:
- - 只使用参考资料中的信息
- - 标注信息来源
- - 没有把握的内容不要编造
-
- return prompt
- def build_conversational_prompt(query: str, context_docs: list,
- chat_history: list = None) -> str:
-
- 支持多轮对话的Prompt
- 需要带上历史对话记录,让大模型理解上下文
-
-
- context_parts = []
- for i, doc in enumerate(context_docs, 1):
- source = doc.get('context_path', '未知来源')
- context_parts.append(f【资料{i},来源:{source}】\n{doc['content']})
- context = \n\n\n\n.join(context_parts)
-
- # 格式化历史对话
- history_text =
- if chat_history:
- history_parts = []
- for turn in chat_history[-5:]: # 只保留最近5轮,避免太长
- history_parts.append(f用户:{turn['user']})
- history_parts.append(f助手:{turn['assistant']})
- history_text = \n.join(history_parts)
-
- prompt = f你是一个企业内部知识库助手。
- ## 参考资料
- {context}
- ## 对话历史
- {history_text if history_text else (这是对话的开始)}
- ## 当前问题
- 用户:{query}
- ## 回答准则
- 1. 优先根据参考资料回答,如无相关信息请明确说明
- 2. 考虑对话历史的上下文(如用户说它可能指代之前提到的概念)
- 3. 标注信息来源
- 助手:
-
- return prompt
复制代码 关于Prompt,我还想分享一个很告急的履历:不要试图在一个Prompt里塞太多指令。
一开始我把各种要求都写进去:复兴要正确、要简便、要友爱、要专业、要标注泉源、要分步调、碰到不确定要说不知道……结果发现模子反而被绕晕了,偶尔候顾了这个忘了谁人。
厥后我的做法是:区分核心指令和优化指令,核心指令必须生存,优化指令可以根据标题范例动态调解。- class PromptBuilder:
- Prompt构建器:根据问题类型动态调整
-
- # 核心指令——任何情况都必须包含
- CORE_INSTRUCTIONS =
- 1. 只使用参考资料中的信息回答,不要编造
- 2. 资料中没有的信息,明确说无法找到相关信息
- 3. 标注信息来源【资料X】
-
- # 操作类问题的额外指令
- PROCEDURE_INSTRUCTIONS =
- 回答格式要求:
- - 按步骤编号列出(第一步、第二步...)
- - 每个步骤要明确操作对象和操作动作
- - 重要的警告或注意事项用⚠️标出
-
- # 概念解释类问题的额外指令
- CONCEPT_INSTRUCTIONS =
- 回答格式要求:
- - 先用一句话给出核心定义
- - 再详细解释关键点
- - 如有必要,举例说明
- # 故障排查类问题的额外指令
- TROUBLESHOOT_INSTRUCTIONS =
- 回答格式要求:
- - 先列出可能的原因
- - 针对每个原因给出排查方法
- - 给出解决方案或规避建议
-
- @classmethod
- def build(cls, query: str, context_docs: list, question_type: str = general) -> str:
- 根据问题类型构建Prompt
-
- # 简单的问题分类逻辑(实际项目中可以用分类模型)
- if question_type == auto:
- question_type = cls._classify_question(query)
-
- extra_instructions =
- if question_type == procedure:
- extra_instructions = cls.PROCEDURE_INSTRUCTIONS
- elif question_type == concept:
- extra_instructions = cls.CONCEPT_INSTRUCTIONS
- elif question_type == troubleshoot:
- extra_instructions = cls.TROUBLESHOOT_INSTRUCTIONS
-
- context = cls._format_context(context_docs)
-
- prompt = f你是企业内部知识库助手。
- ## 必须遵守的规则
- {cls.CORE_INSTRUCTIONS}
- {f## 回答格式{extra_instructions} if extra_instructions else }
- ## 参考资料
- {context}
- ## 用户问题
- {query}
- 请回答:
-
- return prompt
-
- @classmethod
- def _classify_question(cls, query: str) -> str:
- 简单的问题分类(基于关键词)
- procedure_keywords = [怎么做, 如何操作, 步骤, 流程, 怎样]
- concept_keywords = [是什么, 什么是, 定义, 解释, 区别]
- troubleshoot_keywords = [为什么, 报错, 失败, 异常, 问题, 故障]
-
- query_lower = query.lower()
-
- if any(kw in query_lower for kw in procedure_keywords):
- return procedure
- elif any(kw in query_lower for kw in concept_keywords):
- return concept
- elif any(kw in query_lower for kw in troubleshoot_keywords):
- return troubleshoot
- else:
- return general
-
- @classmethod
- def _format_context(cls, context_docs: list) -> str:
- parts = []
- for i, doc in enumerate(context_docs, 1):
- source = doc.get('context_path', '未知来源')
- parts.append(f【资料{i},来源:{source}】\n{doc['content']})
- return \n\n\n\n.join(parts)
复制代码 五、串起来:完备的RAG Pipeline
前面说了一堆细节,如今把它们串成一个完备的Pipeline:
 - from openai import OpenAI
- from typing import List, Dict, Optional
- import json
- class RAGPipeline:
-
- 完整的RAG处理流程
- 文档切分 -> 向量化存储 -> 检索 -> 重排序 -> 生成回答
-
-
- def __init__(self,
- llm_base_url: str = https://api.deepseek.com,
- llm_api_key: str = your-api-key,
- llm_model: str = deepseek-chat):
-
- # 初始化各个组件
- self.splitter = SmartDocumentSplitter(max_chunk_size=800)
- self.vector_store = VectorStore()
- self.retriever = EnhancedRetriever(self.vector_store)
-
- # 初始化LLM客户端(这里用DeepSeek,也可以换成其他的)
- self.llm_client = OpenAI(base_url=llm_base_url, api_key=llm_api_key)
- self.llm_model = llm_model
-
- self.collection_name = knowledge_base
-
- def ingest_documents(self, documents: List[Dict]):
-
- 文档入库
- documents格式:[{title: 文档标题, content: 文档内容, source: 来源}]
-
- print(f开始处理 {len(documents)} 篇文档...)
-
- all_chunks = []
- for doc in documents:
- # 在内容前加上标题,帮助切分器识别结构
- full_content = f# {doc['title']}\n\n{doc['content']}
-
- chunks = self.splitter.split_markdown(full_content)
-
- # 给每个chunk加上文档来源信息
- for chunk in chunks:
- chunk['source_doc'] = doc.get('source', doc['title'])
-
- all_chunks.extend(chunks)
-
- print(f切分后共 {len(all_chunks)} 个片段)
-
- # 创建集合并插入
- self.vector_store.create_collection(self.collection_name)
- self.vector_store.insert_documents(self.collection_name, all_chunks)
-
- print(文档入库完成!)
-
- def query(self,
- question: str,
- chat_history: Optional[List[Dict]] = None,
- top_k: int = 5,
- use_rerank: bool = True) -> Dict:
-
- 处理用户查询
- 返回:{answer: 回答内容, sources: [引用的来源], retrieved_docs: [检索到的文档]}
-
-
- # 1. 检索相关文档
- if use_rerank:
- retrieved_docs = self.retriever.retrieve_with_rerank(
- self.collection_name, question,
- initial_top_k=20, final_top_k=top_k
- )
- else:
- results = self.vector_store.search(self.collection_name, question, top_k=top_k)
- retrieved_docs = [{
- 'content': hit.entity.get('content'),
- 'context_path': hit.entity.get('context_path'),
- 'score': hit.score
- } for hit in results]
-
- if not retrieved_docs:
- return {
- answer: 抱歉,我没有找到与您问题相关的资料。您可以尝试换个关键词,或联系相关部门获取帮助。,
- sources: [],
- retrieved_docs: []
- }
-
- # 2. 构建Prompt
- if chat_history:
- prompt = build_conversational_prompt(question, retrieved_docs, chat_history)
- else:
- prompt = PromptBuilder.build(question, retrieved_docs, question_type=auto)
-
- # 3. 调用LLM生成回答
- response = self.llm_client.chat.completions.create(
- model=self.llm_model,
- messages=[{role: user, content: prompt}],
- temperature=0.3, # 知识库问答用较低的temperature
- max_tokens=2000
- )
-
- answer = response.choices[0].message.content
-
- # 4. 提取引用的来源
- sources = list(set([doc.get('context_path', '未知来源') for doc in retrieved_docs]))
-
- return {
- answer: answer,
- sources: sources,
- retrieved_docs: retrieved_docs
- }
-
- def evaluate_response(self, question: str, answer: str,
- ground_truth: str = None) -> Dict:
-
- 回答质量评估(可选)
- 用LLM评估回答的质量,方便持续优化
-
- eval_prompt = f请评估以下问答的质量。
- 问题:{question}
- 回答:{answer}
- {f参考答案:{ground_truth} if ground_truth else }
- 请从以下维度评分(1-5分)并说明理由:
- 1. 相关性:回答是否切题
- 2. 准确性:信息是否正确
- 3. 完整性:是否完整解答了问题
- 4. 可读性:表述是否清晰易懂
- 请用JSON格式输出:{{relevance: 分数, accuracy: 分数, completeness: 分数, readability: 分数, comments: 评价说明}}
-
- response = self.llm_client.chat.completions.create(
- model=self.llm_model,
- messages=[{role: user, content: eval_prompt}],
- temperature=0
- )
-
- try:
- eval_result = json.loads(response.choices[0].message.content)
- return eval_result
- except:
- return {error: 评估结果解析失败}
- # 使用示例
- if __name__ == __main__:
- # 初始化Pipeline
- rag = RAGPipeline(
- llm_base_url=https://api.deepseek.com,
- llm_api_key=your-api-key,
- llm_model=deepseek-chat
- )
-
- # 准备测试文档
- test_documents = [
- {
- title: MySQL主从切换操作手册,
- content: """
- ## 1. 前置检查
- 在执行主从切换之前,必须完成以下检查:
- - 确认从库同步状态正常(Seconds_Behind_Master = 0)
- - 确认没有正在执行的大事务
- - 通知相关业务方,确认切换时间窗口
- ## 2. 切换步骤
- ### 2.1 在主库执行只读设置
- SET GLOBAL read_only = 1;
- ### 2.2 等待从库完全同步
- 在从库执行 SHOW SLAVE STATUS,确认 Seconds_Behind_Master = 0
- ### 2.3 停止从库复制并提升为主库
- STOP SLAVE;
- RESET SLAVE ALL;
- SET GLOBAL read_only = 0;
- ## 3. 切换后验证
- - 确认新主库可以正常写入
- - 确认应用连接已切换到新主库
- - 监控新主库的性能指标
- """,
- source: DBA团队文档
- }
- ]
-
- # 入库
- rag.ingest_documents(test_documents)
-
- # 测试查询
- result = rag.query(MySQL切换前需要做哪些检查?)
-
- print(= * 50)
- print(问题:MySQL切换前需要做哪些检查?)
- print(= * 50)
- print(f\n回答:\n{result['answer']})
- print(f\n参考来源:{result['sources']})
复制代码 六、上线后的一些履历教导
体系上线到如今差不多两个月了,期间又踩了不少坑,这里挑几个印象最深的说说。
教导一:用户的标题光怪陆离
我们在计划时假设用户会问MySQL怎么做主从切换这种正常标题。但现实上呢?
有人问:前次谁人变乱怎么处置惩罚的来着?——没有任何上下文,体系根本不知道谁人变乱是哪个。
有人问:帮我写个SQL。——这根本不是知识库问答,这是让大模子帮写代码。
另有人问:在吗?——我也不知道他想干啥。
厥后我加了一个意图辨认层,先判定用户的标题是否属于知识库问答的范畴:- def classify_intent(self, query: str) -> str:
- """识别用户意图"""
- intent_prompt = f"""判断用户输入的意图类别,只输出类别名称:
- - knowledge_query:查询知识库信息(如询问流程、规范、操作方法)
- - code_request:请求生成代码
- - chitchat:闲聊或无明确意图
- - other:其他
- 用户输入:{query}
- 意图类别:"""
-
- response = self.llm_client.chat.completions.create(
- model=self.llm_model,
- messages=[{"role": "user", "content": intent_prompt}],
- temperature=0,
- max_tokens=20
- )
-
- return response.choices[0].message.content.strip()
复制代码 对于非知识库问答的意图,给用户一个友爱的提示而不是硬着头皮检索。
教导二:冷启动时的尴尬
体系刚上线时,知识库里的文档不多,覆盖的场景有限。用户问了几个标题都答不上来,体验特别差,于是就不来用了。
厥后的办理办法:
- 上线前先梳理高频标题,确保至少这些标题能复兴好
- 搞了一个标题网络功能,对于答不上来的标题,纪录下来反馈给内容团队,让他们增补相干文档
- 做了一个兜底计谋——假如检索不到高相干度的内容,就展示相干保举,把一些相似度尚可的文档标题列出来,引导用户自己去看
教导三:文档更新的同步标题
知识库的文档是会更新的。老版本的操纵手册废弃了,新版本发布了。但假如向量数据库里还存着老版本的内容,用户检索到的大概是过期信息。
这个标题提及来简朴,做起来挺贫苦的。我们末了的方案是:
- 每个文档入库时纪录版本号和更新时间
- 定期全量重新入库(我们是每周一次)
- 对于告急更新的告急文档,支持手动触发单篇重入库
七、性能优化:让体系不那么慢
RAG体系有个让人头疼的标题——慢。
整个流程跑一遍:Embedding编码、向量检索、重排序、LLM天生,全部加起来大概要好几秒。用户体验就很差,问一个标题要等半天。
几个优化步调:- import asyncio
- from functools import lru_cache
- import hashlib
- class OptimizedRAG:
- 性能优化版RAG
-
- def __init__(self):
- # 缓存热门查询的结果
- self.query_cache = {}
- self.cache_ttl = 3600 # 1小时过期
-
- @lru_cache(maxsize=1000)
- def _compute_query_embedding(self, query: str):
-
- Embedding结果缓存
- 同样的问题不用重复计算向量
-
- return self.model.encode([query], normalize_embeddings=True)[0]
-
- def _get_cache_key(self, query: str) -> str:
- 生成缓存key
- return hashlib.md5(query.lower().strip().encode()).hexdigest()
-
- async def stream_query(self, question: str):
-
- 流式输出
- 不用等整个回答生成完,边生成边输出
-
- retrieved_docs = await asyncio.to_thread(
- self.retriever.retrieve_with_rerank,
- self.collection_name, question, 20, 5
- )
-
- prompt = PromptBuilder.build(question, retrieved_docs, question_type=auto)
-
- # 使用流式API
- stream = self.llm_client.chat.completions.create(
- model=self.llm_model,
- messages=[{role: user, content: prompt}],
- temperature=0.3,
- stream=True # 开启流式
- )
-
- for chunk in stream:
- if chunk.choices[0].delta.content:
- yield chunk.choices[0].delta.content
复制代码 流式输出这一点特别告急。用户问完标题后,立刻就能看到复兴在打字,生理上就不会以为那么慢了。
八、回顾与思考
把这套体系从被骂下线到成为部门标配,前后折腾了快要一个月。趟过的坑挺多,但劳绩也很大。
几点核心总结:
1. RAG不是万能的,选好实用场景
RAG得当有明确知识库、答案可追溯的场景。假如你的需求是让大模子发挥创造力(好比写文案、做创意),那RAG反而是个束缚。
2. 切分和检索是根基
各人每每把注意力放在大模子自己,以为用更强的模子就能办理标题。但现实上,假如前面的切分和检索做得欠好,再强的模子也是巧妇难为无米之炊。
3. Prompt工程真的是门技术
同样的检索结果,差别的Prompt大概带来天壤之别的复兴结果。这个没什么捷径,就是多试、多看、多迭代。
4. 上线只是开始
真正的寻衅在上线之后。用户的各种奇葩输入、文档的连续更新、性能的优化、结果的监控……每一项都是连续的工作。
末了,附上这套体系如今的一些核心指标:
- 日均查询量:200+次
- 匀称相应时间:2.3秒(开启流式后首字符耽误约0.8秒)
- 用户满意度(通过复兴后的点赞/点踩网络):约72%
- 无法复兴的比例:约22%(这部门会定期分析,推动增补文档)
数字不算特别亮眼,但比起之前谁人没人用的关键词搜索,已经是质的飞跃了。假如你也在做类似的项目,渴望这篇文章能帮你少踩一些坑。有标题欢迎品评区交换!
免责声明:如果侵犯了您的权益,请联系站长及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金. |