大模子RAG实战,从被骂不靠谱到成为部门MVP,这是我的踩坑全纪录 [复制链接]
发表于 2026-2-23 19:43:07 | 显示全部楼层 |阅读模式
齐备的出发点是一顿臭骂

上个月,我被领导叫进办公室骂了整整二非常钟。
因由是如许的——我们部门负责维护一套内部知识库体系,内里沉淀了公司近五年的技能文档、故障处置惩罚手册、另有各种规范流程。标题是,这玩意儿除了当摆设,险些没人用。为啥?由于搜索太烂了,关键词匹配的那种,你搜服务器宕机怎么办,它给你返回一堆包罗服务器的文档,真正有效的那篇反而排在第三页。
新同事入职问标题,老员工翻文档找答案,各人宁肯在群里@人问,也不乐意去知识库里查。
然后领导发话了:你不是每天研究什么大模子吗?能不能整个智能问答,让各人直接问标题就能得到答案?
我其时脑筋一热,拍胸脯说没标题。结果第一版上线三天就被骂下来了——用户问我们的MySQL主从切换流程是什么,大模子复兴得头头是道,但内容美满是它自己编的!跟我们公司的现实流程八竿子打不着。
这就是所谓的大模子幻觉标题,我其时对RAG的明确还停顿在把文档丢进去就行的程度,太灵活了。
不外,厥后的故事还算圆满。我花了快要三周时间重构了整个方案,如今这套体系已经成了部门的标配工具,月生动用户从0涨到了200多,领导在季度会上还专门表彰了一回。本日这篇文章,我就把整个踩坑过程原原来本地纪录下来,包罗代码、架构计划、以及那些教科书上不会告诉你的实战细节。
一、RAG到底在办理什么标题

在动手之前,我想先聊聊RAG这个概念,由于很多刚打仗的朋侪轻易搞混。

大模子很强,但它有两个致命缺点:
第一,知识有制止日期。 GPT-4的训练数据制止到某个时间点,它不知道你们公司上周发布的新规范,也不知道你们昨天刚修复的谁人bug是怎么办理的。
第二,会一本端庄地颠三倒四。 当大模子碰到它不知道的标题时,它不会老老实实说我不知道,而是会基于它学过的通用知识,给你编一个看起来很公道但实在是错的答案。这就是所谓的幻觉(Hallucination)。
RAG(Retrieval-Augmented Generation,检索增强天生)的核心思绪实在很简朴:别让大模子靠想象力答题,先帮它把参考资料找出来,让它照着资料复兴。

详细来说分三步:

  • 把你的私有文档切成小块,转成向量存起来
  • 用户提问时,先根据标题检索出最相干的文档片断
  • 把标题和检索到的内容一起喂给大模子,让它基于这些质料天生答案
听起来不复杂对吧?我其时也是这么想的,然后就踩了一堆坑。
二、第一个大坑:文档切分没那么简朴

我最初的方案特别粗暴——用LangChain的RecursiveCharacterTextSplitter,设置chunk_size=500,overlap=50,直接把全部文档切成小块。

代码写起来确实很简朴:
  1. from langchain.text_splitter import RecursiveCharacterTextSplitter
  2. def naive_split(text):
  3.     最初的简单切分方案——后来证明这是个坑
  4.     splitter = RecursiveCharacterTextSplitter(
  5.         chunk_size=500,
  6.         chunk_overlap=50,
  7.         separators=[\n\n, \n, 。, !, ?,  , ]
  8.     )
  9.     chunks = splitter.split_text(text)
  10.     return chunks
  11. # 测试一下
  12. sample_text =
  13. # MySQL主从切换操作手册
  14. ## 1. 前置检查
  15. 在执行主从切换之前,必须完成以下检查:
  16. - 确认从库同步状态正常(Seconds_Behind_Master = 0)
  17. - 确认没有正在执行的大事务
  18. - 通知相关业务方,确认切换时间窗口
  19. ## 2. 切换步骤
  20. 2.1 在主库执行只读设置
  21. SET GLOBAL read_only = 1;
  22. 2.2 等待从库完全同步
  23. 在从库执行 SHOW SLAVE STATUS,确认 Seconds_Behind_Master = 0
  24. 2.3 停止从库复制
  25. STOP SLAVE;
  26. RESET SLAVE ALL;
  27. ## 3. 回滚方案
  28. 如果切换失败,按以下步骤回滚...
  29. chunks = naive_split(sample_text)
  30. for i, chunk in enumerate(chunks):
  31.     print(f Chunk {i+1} )
  32.     print(chunk[:100] + ... if len(chunk) > 100 else chunk)
复制代码
看起来没毛病是吧?但现实用起来标题大了。
有一次用户问:MySQL切换前须要做哪些查抄?体系返回的文档片断是如许的:
  1. 确认没有正在执行的大事务
  2. - 通知相关业务方,确认切换时间窗口
  3. ## 2. 切换步骤
  4. 2.1 在主库执行只读设置
  5. SET GLOBAL read_only = 1;
复制代码
发现标题了吗?这个片断恰好从查抄步调的中心切开了!第一条查抄项确认从库同步状态正常被切到了上一个chunk里。用户问的是须要做哪些查抄,结果我们给大模子的参考资料里,第一条查抄项就没包罗进去。
核心教导:机器地按字数切分,会打断文档的语义完备性。
厥后我改成了基于语义结构的切分计谋:
  1. import re
  2. from typing import List, Dict
  3. class SmartDocumentSplitter:
  4.    
  5.     语义感知的文档切分器
  6.     核心思路:尊重文档的原有结构,按标题、段落等语义边界切分
  7.    
  8.    
  9.     def __init__(self, max_chunk_size=800, min_chunk_size=100):
  10.         self.max_chunk_size = max_chunk_size
  11.         self.min_chunk_size = min_chunk_size
  12.    
  13.     def split_markdown(self, text: str) -> List[Dict]:
  14.         
  15.         针对Markdown文档的切分
  16.         保持标题层级结构,每个chunk都带上完整的上下文路径
  17.         
  18.         chunks = []
  19.         current_headers = {1: , 2: , 3: }  # 记录当前的标题层级
  20.         
  21.         # 按行处理,识别标题和内容
  22.         lines = text.split('\n')
  23.         current_content = []
  24.         
  25.         for line in lines:
  26.             # 检测Markdown标题
  27.             header_match = re.match(r'^(#{1,3})\s+(.+)$', line)
  28.             
  29.             if header_match:
  30.                 # 遇到新标题,先保存之前的内容
  31.                 if current_content:
  32.                     chunk_text = '\n'.join(current_content).strip()
  33.                     if len(chunk_text) >= self.min_chunk_size:
  34.                         chunks.append({
  35.                             'content': chunk_text,
  36.                             'headers': dict(current_headers),
  37.                             'context_path': self._build_context_path(current_headers)
  38.                         })
  39.                     current_content = []
  40.                
  41.                 # 更新标题层级
  42.                 level = len(header_match.group(1))
  43.                 title = header_match.group(2)
  44.                 current_headers[level] = title
  45.                
  46.                 # 清除下级标题
  47.                 for l in range(level + 1, 4):
  48.                     current_headers[l] =
  49.                
  50.                 current_content.append(line)
  51.             else:
  52.                 current_content.append(line)
  53.                
  54.                 # 如果当前内容超过最大长度,强制切分(但尽量在段落边界)
  55.                 content_so_far = '\n'.join(current_content)
  56.                 if len(content_so_far) > self.max_chunk_size:
  57.                     chunk_text = content_so_far.strip()
  58.                     chunks.append({
  59.                         'content': chunk_text,
  60.                         'headers': dict(current_headers),
  61.                         'context_path': self._build_context_path(current_headers)
  62.                     })
  63.                     current_content = []
  64.         
  65.         # 别忘了最后一段
  66.         if current_content:
  67.             chunk_text = '\n'.join(current_content).strip()
  68.             if len(chunk_text) >= self.min_chunk_size:
  69.                 chunks.append({
  70.                     'content': chunk_text,
  71.                     'headers': dict(current_headers),
  72.                     'context_path': self._build_context_path(current_headers)
  73.                 })
  74.         
  75.         return chunks
  76.    
  77.     def _build_context_path(self, headers: Dict) -> str:
  78.         构建层级路径,比如:MySQL主从切换 > 前置检查
  79.         path_parts = [h for h in [headers[1], headers[2], headers[3]] if h]
  80.         return ' > '.join(path_parts) if path_parts else '未分类'
  81.    
  82.     def enrich_chunk_with_context(self, chunk: Dict) -> str:
  83.         
  84.         关键技巧:给每个chunk加上上下文前缀
  85.         这样即使单独看这个片段,也能知道它属于哪个章节
  86.         
  87.         context = f[文档路径:{chunk['context_path']}]\n\n
  88.         return context + chunk['content']
  89. # 实际使用示例
  90. splitter = SmartDocumentSplitter(max_chunk_size=800)
  91. chunks = splitter.split_markdown(sample_text)
  92. print(f切分后共 {len(chunks)} 个片段\n)
  93. for i, chunk in enumerate(chunks):
  94.     print(f=== Chunk {i+1} ===)
  95.     print(f路径:{chunk['context_path']})
  96.     print(f内容预览:{chunk['content'][:150]}...)
  97.     print()
复制代码
如许切出来的结果就很多多少了。每个chunk开头都会带上它的位置信息,大模子在复兴时能更正确地明确这段内容的上下文。
不外说实话,这个方案也不是万能的。对于那些格式不规范的老文档(没有清晰的标题结构),切分结果依然一样平常。厥后我又针对差别范例的文档做了差异化处置惩罚,这个我们背面再说。
三、第二个大坑:向量检索的语义鸿沟

办理了切分标题,下一步就是向量化和检索了。我用的是开源的BGE模子做Embedding,用Milvus做向量数据库。
第一版的检索代码很直白:
  1. from sentence_transformers import SentenceTransformer
  2. from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
  3. import numpy as np
  4. class VectorStore:
  5.     向量存储和检索
  6.    
  7.     def __init__(self, model_name='BAAI/bge-base-zh-v1.5'):
  8.         # 加载Embedding模型
  9.         self.model = SentenceTransformer(model_name)
  10.         self.dim = 768  # BGE base模型的向量维度
  11.         
  12.         # 连接Milvus
  13.         connections.connect(default, host=localhost, port=19530)
  14.         
  15.     def create_collection(self, collection_name: str):
  16.         创建集合
  17.         if utility.has_collection(collection_name):
  18.             utility.drop_collection(collection_name)
  19.         
  20.         fields = [
  21.             FieldSchema(name=id, dtype=DataType.INT64, is_primary=True, auto_id=True),
  22.             FieldSchema(name=content, dtype=DataType.VARCHAR, max_length=4096),
  23.             FieldSchema(name=context_path, dtype=DataType.VARCHAR, max_length=512),
  24.             FieldSchema(name=embedding, dtype=DataType.FLOAT_VECTOR, dim=self.dim)
  25.         ]
  26.         schema = CollectionSchema(fields, description=知识库文档)
  27.         collection = Collection(collection_name, schema)
  28.         
  29.         # 创建索引
  30.         index_params = {
  31.             metric_type: COSINE,
  32.             index_type: IVF_FLAT,
  33.             params: {nlist: 128}
  34.         }
  35.         collection.create_index(embedding, index_params)
  36.         return collection
  37.    
  38.     def insert_documents(self, collection_name: str, chunks: list):
  39.         插入文档
  40.         collection = Collection(collection_name)
  41.         
  42.         contents = [chunk['content'] for chunk in chunks]
  43.         context_paths = [chunk['context_path'] for chunk in chunks]
  44.         
  45.         # 批量生成Embedding
  46.         embeddings = self.model.encode(contents, normalize_embeddings=True)
  47.         
  48.         collection.insert([contents, context_paths, embeddings.tolist()])
  49.         collection.flush()
  50.         print(f成功插入 {len(chunks)} 条文档)
  51.    
  52.     def search(self, collection_name: str, query: str, top_k: int = 5):
  53.         基础检索
  54.         collection = Collection(collection_name)
  55.         collection.load()
  56.         
  57.         # 生成查询向量
  58.         query_embedding = self.model.encode([query], normalize_embeddings=True)
  59.         
  60.         results = collection.search(
  61.             data=query_embedding.tolist(),
  62.             anns_field=embedding,
  63.             param={metric_type: COSINE, params: {nprobe: 16}},
  64.             limit=top_k,
  65.             output_fields=[content, context_path]
  66.         )
  67.         
  68.         return results[0]
复制代码
根本功能是没标题的。但现实跑起来,我发现了一个让人抓狂的征象——用户的口语化提问和文档的正式表述之间存在巨大的语义鸿沟
举个例子:

  • 用户问:数据库挂了怎么办
  • 文档标题是:MySQL服务非常规复操纵手册
这两个在语义上是相干的,但向量相似度大概并不高。由于用户说的挂了和文档里的非常,用词差异很大。
更坑的是,偶尔候检索出的Top 5结果里,真正相干的那篇大概只排在第3或第4位,但前两名是一些看起来相干但现实上文不对题的内容。假如我只取Top 3喂给大模子,大概就遗漏了最关键的信息。
厥后我接纳了一个两阶段检索的计谋:先用向量检索做粗筛,再用重排序模子做精排。
  1. from transformers import AutoModelForSequenceClassification, AutoTokenizer
  2. import torch
  3. class EnhancedRetriever:
  4.     增强版检索器:向量检索 + 重排序
  5.    
  6.     def __init__(self, vector_store: VectorStore):
  7.         self.vector_store = vector_store
  8.         
  9.         # 加载重排序模型(BGE Reranker效果不错)
  10.         self.reranker_tokenizer = AutoTokenizer.from_pretrained('BAAI/bge-reranker-base')
  11.         self.reranker_model = AutoModelForSequenceClassification.from_pretrained('BAAI/bge-reranker-base')
  12.         self.reranker_model.eval()
  13.    
  14.     def retrieve_with_rerank(self, collection_name: str, query: str,
  15.                              initial_top_k: int = 20, final_top_k: int = 5):
  16.         
  17.         两阶段检索:
  18.         1. 向量检索召回 initial_top_k 个候选
  19.         2. 用重排序模型精排,返回 final_top_k 个结果
  20.         
  21.         # 第一阶段:向量检索(召回更多候选)
  22.         initial_results = self.vector_store.search(collection_name, query, top_k=initial_top_k)
  23.         
  24.         if not initial_results:
  25.             return []
  26.         
  27.         # 准备重排序
  28.         candidates = []
  29.         for hit in initial_results:
  30.             candidates.append({
  31.                 'content': hit.entity.get('content'),
  32.                 'context_path': hit.entity.get('context_path'),
  33.                 'vector_score': hit.score  # 保留向量检索得分,用于调试
  34.             })
  35.         
  36.         # 第二阶段:重排序
  37.         rerank_scores = self._compute_rerank_scores(query, [c['content'] for c in candidates])
  38.         
  39.         for i, score in enumerate(rerank_scores):
  40.             candidates[i]['rerank_score'] = score
  41.         
  42.         # 按重排序分数排序
  43.         candidates.sort(key=lambda x: x['rerank_score'], reverse=True)
  44.         
  45.         return candidates[:final_top_k]
  46.    
  47.     def _compute_rerank_scores(self, query: str, documents: list) -> list:
  48.         计算query和每个文档的相关性分数
  49.         scores = []
  50.         
  51.         with torch.no_grad():
  52.             for doc in documents:
  53.                 # Reranker的输入格式是 [query, document]
  54.                 inputs = self.reranker_tokenizer(
  55.                     [[query, doc]],
  56.                     padding=True,
  57.                     truncation=True,
  58.                     max_length=512,
  59.                     return_tensors='pt'
  60.                 )
  61.                 outputs = self.reranker_model(**inputs)
  62.                 score = outputs.logits.squeeze().item()
  63.                 scores.append(score)
  64.         
  65.         return scores
  66.    
  67.     def retrieve_with_query_expansion(self, collection_name: str, query: str,
  68.                                        llm_client, top_k: int = 5):
  69.         
  70.         进阶技巧:查询扩展
  71.         用大模型改写用户问题,生成多个变体,再合并检索结果
  72.         
  73.         # 让大模型帮我们扩展查询
  74.         expansion_prompt = f请将下面这个问题改写成3个不同的表达方式,保持意思相同但用词不同。
  75. 每行输出一个改写结果,不要序号,不要其他解释。
  76. 原问题:{query}
  77.         
  78.         expanded_queries = llm_client.generate(expansion_prompt).strip().split('\n')
  79.         expanded_queries = [q.strip() for q in expanded_queries if q.strip()]
  80.         
  81.         # 加上原始查询
  82.         all_queries = [query] + expanded_queries[:3]  # 最多取3个扩展查询
  83.         
  84.         print(f扩展后的查询:{all_queries})  # 调试用
  85.         
  86.         # 对每个查询分别检索
  87.         all_candidates = {}
  88.         for q in all_queries:
  89.             results = self.vector_store.search(collection_name, q, top_k=10)
  90.             for hit in results:
  91.                 content = hit.entity.get('content')
  92.                 if content not in all_candidates:
  93.                     all_candidates[content] = {
  94.                         'content': content,
  95.                         'context_path': hit.entity.get('context_path'),
  96.                         'best_score': hit.score,
  97.                         'hit_count': 1
  98.                     }
  99.                 else:
  100.                     # 被多个查询命中的文档,增加权重
  101.                     all_candidates[content]['hit_count'] += 1
  102.                     all_candidates[content]['best_score'] = max(
  103.                         all_candidates[content]['best_score'],
  104.                         hit.score
  105.                     )
  106.         
  107.         # 综合评分:命中次数 * 最高得分
  108.         candidates = list(all_candidates.values())
  109.         for c in candidates:
  110.             c['combined_score'] = c['hit_count'] * c['best_score']
  111.         
  112.         candidates.sort(key=lambda x: x['combined_score'], reverse=True)
  113.         return candidates[:top_k]
复制代码
查询扩展这招特别好用。好比用户问数据库挂了怎么办,大模子大概会扩展成:

  • MySQL服务故障怎样处置惩罚
  • 数据库无法毗连的办理方案
  • 数据库宕机规复步调
这几个查询一起检索,能覆盖更多的相干文档。
四、第三个大坑:Prompt工程的门道比想象中深

检索的标题办理了,接下来就是把检索到的内容和用户标题一起喂给大模子了。这一步我本以为最简朴,没想到也踩了不少坑。
最初的Prompt特别淳厚:
  1. def build_naive_prompt(query: str, context_docs: list) -> str:
  2.     最初的简单Prompt——后来证明太天真了
  3.     context = \n\n.join([doc['content'] for doc in context_docs])
  4.    
  5.     prompt = f根据以下参考资料回答用户问题。
  6. 参考资料:
  7. {context}
  8. 用户问题:{query}
  9. 请回答:
  10.    
  11.     return prompt
复制代码
这个Prompt有几个严峻标题:
标题一:大模子不知道什么时间该说不知道。 当参考资料里确实没有答案时,它照旧会编一个出来。
标题二:没有引导大模子阐明信息泉源。 用户看到答案,不知道是从哪篇文档里来的,无法追溯和验证。
标题三:对于复杂标题,复兴的结构不敷清晰。
厥后迭代了很多版,终极稳固下来的Prompt是如许的:
  1. def build_rag_prompt(query: str, context_docs: list,
  2.                      include_sources: bool = True) -> str:
  3.    
  4.     生产环境使用的Prompt模板
  5.     关键设计:明确角色定位、限制回答范围、要求标注来源
  6.    
  7.    
  8.     # 格式化上下文,每段都标注来源
  9.     context_parts = []
  10.     for i, doc in enumerate(context_docs, 1):
  11.         source = doc.get('context_path', '未知来源')
  12.         context_parts.append(f【资料{i},来源:{source}】\n{doc['content']})
  13.    
  14.     context = \n\n\n\n.join(context_parts)
  15.    
  16.     prompt = f你是一个企业内部知识库助手,专门帮助员工查找和理解公司内部文档。
  17. ## 你的工作准则
  18. 1. **只根据提供的参考资料回答问题**,不要使用你自己的知识。
  19. 2. 如果参考资料中没有相关信息,请明确说根据现有资料,我无法找到关于这个问题的信息,并建议用户联系相关部门或换个关键词搜索。
  20. 3. 回答时请标注信息来源,格式如【资料1】,方便用户追溯原文。
  21. 4. 对于操作类问题,请按步骤清晰地列出;对于概念类问题,先给出简明定义再展开解释。
  22. 5. 如果不同资料中的信息有冲突,请指出差异并说明各自的适用场景。
  23. ## 参考资料
  24. {context}
  25. ## 用户问题
  26. {query}
  27. ## 回答要求
  28. 请根据上述参考资料回答用户问题。记住:
  29. - 只使用参考资料中的信息
  30. - 标注信息来源
  31. - 没有把握的内容不要编造
  32.    
  33.     return prompt
  34. def build_conversational_prompt(query: str, context_docs: list,
  35.                                  chat_history: list = None) -> str:
  36.    
  37.     支持多轮对话的Prompt
  38.     需要带上历史对话记录,让大模型理解上下文
  39.    
  40.    
  41.     context_parts = []
  42.     for i, doc in enumerate(context_docs, 1):
  43.         source = doc.get('context_path', '未知来源')
  44.         context_parts.append(f【资料{i},来源:{source}】\n{doc['content']})
  45.     context = \n\n\n\n.join(context_parts)
  46.    
  47.     # 格式化历史对话
  48.     history_text =
  49.     if chat_history:
  50.         history_parts = []
  51.         for turn in chat_history[-5:]:  # 只保留最近5轮,避免太长
  52.             history_parts.append(f用户:{turn['user']})
  53.             history_parts.append(f助手:{turn['assistant']})
  54.         history_text = \n.join(history_parts)
  55.    
  56.     prompt = f你是一个企业内部知识库助手。
  57. ## 参考资料
  58. {context}
  59. ## 对话历史
  60. {history_text if history_text else (这是对话的开始)}
  61. ## 当前问题
  62. 用户:{query}
  63. ## 回答准则
  64. 1. 优先根据参考资料回答,如无相关信息请明确说明
  65. 2. 考虑对话历史的上下文(如用户说它可能指代之前提到的概念)
  66. 3. 标注信息来源
  67. 助手:
  68.    
  69.     return prompt
复制代码
关于Prompt,我还想分享一个很告急的履历:不要试图在一个Prompt里塞太多指令
一开始我把各种要求都写进去:复兴要正确、要简便、要友爱、要专业、要标注泉源、要分步调、碰到不确定要说不知道……结果发现模子反而被绕晕了,偶尔候顾了这个忘了谁人。
厥后我的做法是:区分核心指令和优化指令,核心指令必须生存,优化指令可以根据标题范例动态调解。
  1. class PromptBuilder:
  2.     Prompt构建器:根据问题类型动态调整
  3.    
  4.     # 核心指令——任何情况都必须包含
  5.     CORE_INSTRUCTIONS =
  6. 1. 只使用参考资料中的信息回答,不要编造
  7. 2. 资料中没有的信息,明确说无法找到相关信息
  8. 3. 标注信息来源【资料X】
  9.    
  10.     # 操作类问题的额外指令
  11.     PROCEDURE_INSTRUCTIONS =
  12. 回答格式要求:
  13. - 按步骤编号列出(第一步、第二步...)
  14. - 每个步骤要明确操作对象和操作动作
  15. - 重要的警告或注意事项用⚠️标出
  16.    
  17.     # 概念解释类问题的额外指令
  18.     CONCEPT_INSTRUCTIONS =
  19. 回答格式要求:
  20. - 先用一句话给出核心定义
  21. - 再详细解释关键点
  22. - 如有必要,举例说明
  23.     # 故障排查类问题的额外指令
  24.     TROUBLESHOOT_INSTRUCTIONS =
  25. 回答格式要求:
  26. - 先列出可能的原因
  27. - 针对每个原因给出排查方法
  28. - 给出解决方案或规避建议
  29.    
  30.     @classmethod
  31.     def build(cls, query: str, context_docs: list, question_type: str = general) -> str:
  32.         根据问题类型构建Prompt
  33.         
  34.         # 简单的问题分类逻辑(实际项目中可以用分类模型)
  35.         if question_type == auto:
  36.             question_type = cls._classify_question(query)
  37.         
  38.         extra_instructions =
  39.         if question_type == procedure:
  40.             extra_instructions = cls.PROCEDURE_INSTRUCTIONS
  41.         elif question_type == concept:
  42.             extra_instructions = cls.CONCEPT_INSTRUCTIONS
  43.         elif question_type == troubleshoot:
  44.             extra_instructions = cls.TROUBLESHOOT_INSTRUCTIONS
  45.         
  46.         context = cls._format_context(context_docs)
  47.         
  48.         prompt = f你是企业内部知识库助手。
  49. ## 必须遵守的规则
  50. {cls.CORE_INSTRUCTIONS}
  51. {f## 回答格式{extra_instructions} if extra_instructions else }
  52. ## 参考资料
  53. {context}
  54. ## 用户问题
  55. {query}
  56. 请回答:
  57.         
  58.         return prompt
  59.    
  60.     @classmethod
  61.     def _classify_question(cls, query: str) -> str:
  62.         简单的问题分类(基于关键词)
  63.         procedure_keywords = [怎么做, 如何操作, 步骤, 流程, 怎样]
  64.         concept_keywords = [是什么, 什么是, 定义, 解释, 区别]
  65.         troubleshoot_keywords = [为什么, 报错, 失败, 异常, 问题, 故障]
  66.         
  67.         query_lower = query.lower()
  68.         
  69.         if any(kw in query_lower for kw in procedure_keywords):
  70.             return procedure
  71.         elif any(kw in query_lower for kw in concept_keywords):
  72.             return concept
  73.         elif any(kw in query_lower for kw in troubleshoot_keywords):
  74.             return troubleshoot
  75.         else:
  76.             return general
  77.    
  78.     @classmethod
  79.     def _format_context(cls, context_docs: list) -> str:
  80.         parts = []
  81.         for i, doc in enumerate(context_docs, 1):
  82.             source = doc.get('context_path', '未知来源')
  83.             parts.append(f【资料{i},来源:{source}】\n{doc['content']})
  84.         return \n\n\n\n.join(parts)
复制代码
五、串起来:完备的RAG Pipeline

前面说了一堆细节,如今把它们串成一个完备的Pipeline:
  1. from openai import OpenAI
  2. from typing import List, Dict, Optional
  3. import json
  4. class RAGPipeline:
  5.    
  6.     完整的RAG处理流程
  7.     文档切分 -> 向量化存储 -> 检索 -> 重排序 -> 生成回答
  8.    
  9.    
  10.     def __init__(self,
  11.                  llm_base_url: str = https://api.deepseek.com,
  12.                  llm_api_key: str = your-api-key,
  13.                  llm_model: str = deepseek-chat):
  14.         
  15.         # 初始化各个组件
  16.         self.splitter = SmartDocumentSplitter(max_chunk_size=800)
  17.         self.vector_store = VectorStore()
  18.         self.retriever = EnhancedRetriever(self.vector_store)
  19.         
  20.         # 初始化LLM客户端(这里用DeepSeek,也可以换成其他的)
  21.         self.llm_client = OpenAI(base_url=llm_base_url, api_key=llm_api_key)
  22.         self.llm_model = llm_model
  23.         
  24.         self.collection_name = knowledge_base
  25.    
  26.     def ingest_documents(self, documents: List[Dict]):
  27.         
  28.         文档入库
  29.         documents格式:[{title: 文档标题, content: 文档内容, source: 来源}]
  30.         
  31.         print(f开始处理 {len(documents)} 篇文档...)
  32.         
  33.         all_chunks = []
  34.         for doc in documents:
  35.             # 在内容前加上标题,帮助切分器识别结构
  36.             full_content = f# {doc['title']}\n\n{doc['content']}
  37.             
  38.             chunks = self.splitter.split_markdown(full_content)
  39.             
  40.             # 给每个chunk加上文档来源信息
  41.             for chunk in chunks:
  42.                 chunk['source_doc'] = doc.get('source', doc['title'])
  43.             
  44.             all_chunks.extend(chunks)
  45.         
  46.         print(f切分后共 {len(all_chunks)} 个片段)
  47.         
  48.         # 创建集合并插入
  49.         self.vector_store.create_collection(self.collection_name)
  50.         self.vector_store.insert_documents(self.collection_name, all_chunks)
  51.         
  52.         print(文档入库完成!)
  53.    
  54.     def query(self,
  55.               question: str,
  56.               chat_history: Optional[List[Dict]] = None,
  57.               top_k: int = 5,
  58.               use_rerank: bool = True) -> Dict:
  59.         
  60.         处理用户查询
  61.         返回:{answer: 回答内容, sources: [引用的来源], retrieved_docs: [检索到的文档]}
  62.         
  63.         
  64.         # 1. 检索相关文档
  65.         if use_rerank:
  66.             retrieved_docs = self.retriever.retrieve_with_rerank(
  67.                 self.collection_name, question,
  68.                 initial_top_k=20, final_top_k=top_k
  69.             )
  70.         else:
  71.             results = self.vector_store.search(self.collection_name, question, top_k=top_k)
  72.             retrieved_docs = [{
  73.                 'content': hit.entity.get('content'),
  74.                 'context_path': hit.entity.get('context_path'),
  75.                 'score': hit.score
  76.             } for hit in results]
  77.         
  78.         if not retrieved_docs:
  79.             return {
  80.                 answer: 抱歉,我没有找到与您问题相关的资料。您可以尝试换个关键词,或联系相关部门获取帮助。,
  81.                 sources: [],
  82.                 retrieved_docs: []
  83.             }
  84.         
  85.         # 2. 构建Prompt
  86.         if chat_history:
  87.             prompt = build_conversational_prompt(question, retrieved_docs, chat_history)
  88.         else:
  89.             prompt = PromptBuilder.build(question, retrieved_docs, question_type=auto)
  90.         
  91.         # 3. 调用LLM生成回答
  92.         response = self.llm_client.chat.completions.create(
  93.             model=self.llm_model,
  94.             messages=[{role: user, content: prompt}],
  95.             temperature=0.3,  # 知识库问答用较低的temperature
  96.             max_tokens=2000
  97.         )
  98.         
  99.         answer = response.choices[0].message.content
  100.         
  101.         # 4. 提取引用的来源
  102.         sources = list(set([doc.get('context_path', '未知来源') for doc in retrieved_docs]))
  103.         
  104.         return {
  105.             answer: answer,
  106.             sources: sources,
  107.             retrieved_docs: retrieved_docs
  108.         }
  109.    
  110.     def evaluate_response(self, question: str, answer: str,
  111.                           ground_truth: str = None) -> Dict:
  112.         
  113.         回答质量评估(可选)
  114.         用LLM评估回答的质量,方便持续优化
  115.         
  116.         eval_prompt = f请评估以下问答的质量。
  117. 问题:{question}
  118. 回答:{answer}
  119. {f参考答案:{ground_truth} if ground_truth else }
  120. 请从以下维度评分(1-5分)并说明理由:
  121. 1. 相关性:回答是否切题
  122. 2. 准确性:信息是否正确
  123. 3. 完整性:是否完整解答了问题
  124. 4. 可读性:表述是否清晰易懂
  125. 请用JSON格式输出:{{relevance: 分数, accuracy: 分数, completeness: 分数, readability: 分数, comments: 评价说明}}
  126.         
  127.         response = self.llm_client.chat.completions.create(
  128.             model=self.llm_model,
  129.             messages=[{role: user, content: eval_prompt}],
  130.             temperature=0
  131.         )
  132.         
  133.         try:
  134.             eval_result = json.loads(response.choices[0].message.content)
  135.             return eval_result
  136.         except:
  137.             return {error: 评估结果解析失败}
  138. # 使用示例
  139. if __name__ == __main__:
  140.     # 初始化Pipeline
  141.     rag = RAGPipeline(
  142.         llm_base_url=https://api.deepseek.com,
  143.         llm_api_key=your-api-key,
  144.         llm_model=deepseek-chat
  145.     )
  146.    
  147.     # 准备测试文档
  148.     test_documents = [
  149.         {
  150.             title: MySQL主从切换操作手册,
  151.             content: """
  152.                                 ## 1. 前置检查
  153.                                 在执行主从切换之前,必须完成以下检查:
  154.                                 - 确认从库同步状态正常(Seconds_Behind_Master = 0)
  155.                                 - 确认没有正在执行的大事务
  156.                                 - 通知相关业务方,确认切换时间窗口
  157.                                 ## 2. 切换步骤
  158.                                 ### 2.1 在主库执行只读设置
  159.                                 SET GLOBAL read_only = 1;
  160.                                 ### 2.2 等待从库完全同步
  161.                                 在从库执行 SHOW SLAVE STATUS,确认 Seconds_Behind_Master = 0
  162.                                 ### 2.3 停止从库复制并提升为主库
  163.                                 STOP SLAVE;
  164.                                 RESET SLAVE ALL;
  165.                                 SET GLOBAL read_only = 0;
  166.                                 ## 3. 切换后验证
  167.                                 - 确认新主库可以正常写入
  168.                                 - 确认应用连接已切换到新主库
  169.                                 - 监控新主库的性能指标
  170.                                 """,
  171.             source: DBA团队文档
  172.         }
  173.     ]
  174.    
  175.     # 入库
  176.     rag.ingest_documents(test_documents)
  177.    
  178.     # 测试查询
  179.     result = rag.query(MySQL切换前需要做哪些检查?)
  180.    
  181.     print(= * 50)
  182.     print(问题:MySQL切换前需要做哪些检查?)
  183.     print(= * 50)
  184.     print(f\n回答:\n{result['answer']})
  185.     print(f\n参考来源:{result['sources']})
复制代码
六、上线后的一些履历教导

体系上线到如今差不多两个月了,期间又踩了不少坑,这里挑几个印象最深的说说。
教导一:用户的标题光怪陆离

我们在计划时假设用户会问MySQL怎么做主从切换这种正常标题。但现实上呢?
有人问:前次谁人变乱怎么处置惩罚的来着?——没有任何上下文,体系根本不知道谁人变乱是哪个。
有人问:帮我写个SQL。——这根本不是知识库问答,这是让大模子帮写代码。
另有人问:在吗?——我也不知道他想干啥。
厥后我加了一个意图辨认层,先判定用户的标题是否属于知识库问答的范畴:
  1. def classify_intent(self, query: str) -> str:
  2.     """识别用户意图"""
  3.     intent_prompt = f"""判断用户输入的意图类别,只输出类别名称:
  4. - knowledge_query:查询知识库信息(如询问流程、规范、操作方法)
  5. - code_request:请求生成代码
  6. - chitchat:闲聊或无明确意图
  7. - other:其他
  8. 用户输入:{query}
  9. 意图类别:"""
  10.    
  11.     response = self.llm_client.chat.completions.create(
  12.         model=self.llm_model,
  13.         messages=[{"role": "user", "content": intent_prompt}],
  14.         temperature=0,
  15.         max_tokens=20
  16.     )
  17.    
  18.     return response.choices[0].message.content.strip()
复制代码
对于非知识库问答的意图,给用户一个友爱的提示而不是硬着头皮检索。
教导二:冷启动时的尴尬

体系刚上线时,知识库里的文档不多,覆盖的场景有限。用户问了几个标题都答不上来,体验特别差,于是就不来用了。
厥后的办理办法:

  • 上线前先梳理高频标题,确保至少这些标题能复兴好
  • 搞了一个标题网络功能,对于答不上来的标题,纪录下来反馈给内容团队,让他们增补相干文档
  • 做了一个兜底计谋——假如检索不到高相干度的内容,就展示相干保举,把一些相似度尚可的文档标题列出来,引导用户自己去看
教导三:文档更新的同步标题

知识库的文档是会更新的。老版本的操纵手册废弃了,新版本发布了。但假如向量数据库里还存着老版本的内容,用户检索到的大概是过期信息。
这个标题提及来简朴,做起来挺贫苦的。我们末了的方案是:

  • 每个文档入库时纪录版本号和更新时间
  • 定期全量重新入库(我们是每周一次)
  • 对于告急更新的告急文档,支持手动触发单篇重入库
七、性能优化:让体系不那么慢

RAG体系有个让人头疼的标题——慢。
整个流程跑一遍:Embedding编码、向量检索、重排序、LLM天生,全部加起来大概要好几秒。用户体验就很差,问一个标题要等半天。

几个优化步调:
  1. import asyncio
  2. from functools import lru_cache
  3. import hashlib
  4. class OptimizedRAG:
  5.     性能优化版RAG
  6.    
  7.     def __init__(self):
  8.         # 缓存热门查询的结果
  9.         self.query_cache = {}
  10.         self.cache_ttl = 3600  # 1小时过期
  11.    
  12.     @lru_cache(maxsize=1000)
  13.     def _compute_query_embedding(self, query: str):
  14.         
  15.         Embedding结果缓存
  16.         同样的问题不用重复计算向量
  17.         
  18.         return self.model.encode([query], normalize_embeddings=True)[0]
  19.    
  20.     def _get_cache_key(self, query: str) -> str:
  21.         生成缓存key
  22.         return hashlib.md5(query.lower().strip().encode()).hexdigest()
  23.    
  24.     async def stream_query(self, question: str):
  25.         
  26.         流式输出
  27.         不用等整个回答生成完,边生成边输出
  28.         
  29.         retrieved_docs = await asyncio.to_thread(
  30.             self.retriever.retrieve_with_rerank,
  31.             self.collection_name, question, 20, 5
  32.         )
  33.         
  34.         prompt = PromptBuilder.build(question, retrieved_docs, question_type=auto)
  35.         
  36.         # 使用流式API
  37.         stream = self.llm_client.chat.completions.create(
  38.             model=self.llm_model,
  39.             messages=[{role: user, content: prompt}],
  40.             temperature=0.3,
  41.             stream=True  # 开启流式
  42.         )
  43.         
  44.         for chunk in stream:
  45.             if chunk.choices[0].delta.content:
  46.                 yield chunk.choices[0].delta.content
复制代码
流式输出这一点特别告急。用户问完标题后,立刻就能看到复兴在打字,生理上就不会以为那么慢了。
八、回顾与思考

把这套体系从被骂下线到成为部门标配,前后折腾了快要一个月。趟过的坑挺多,但劳绩也很大。
几点核心总结:
1. RAG不是万能的,选好实用场景
RAG得当有明确知识库、答案可追溯的场景。假如你的需求是让大模子发挥创造力(好比写文案、做创意),那RAG反而是个束缚。
2. 切分和检索是根基
各人每每把注意力放在大模子自己,以为用更强的模子就能办理标题。但现实上,假如前面的切分和检索做得欠好,再强的模子也是巧妇难为无米之炊。
3. Prompt工程真的是门技术
同样的检索结果,差别的Prompt大概带来天壤之别的复兴结果。这个没什么捷径,就是多试、多看、多迭代。
4. 上线只是开始
真正的寻衅在上线之后。用户的各种奇葩输入、文档的连续更新、性能的优化、结果的监控……每一项都是连续的工作。

末了,附上这套体系如今的一些核心指标:

  • 日均查询量:200+次
  • 匀称相应时间:2.3秒(开启流式后首字符耽误约0.8秒)
  • 用户满意度(通过复兴后的点赞/点踩网络):约72%
  • 无法复兴的比例:约22%(这部门会定期分析,推动增补文档)
数字不算特别亮眼,但比起之前谁人没人用的关键词搜索,已经是质的飞跃了。假如你也在做类似的项目,渴望这篇文章能帮你少踩一些坑。有标题欢迎品评区交换!

免责声明:如果侵犯了您的权益,请联系站长及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金.

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表