系列导航:[第一篇] → ... → [第八篇:Agent 开发] → 第九篇(当前) → 第十篇:LangGraph 初探 前置要求:已掌握前八篇内容,理解 LCEL 链的组合方式、OutputParser 和 ChatPromptTemplate 本篇目标:从零开始掌握 RAG(检索增强生成)的完整技术链路。理解为什么需要 RAG、每个环节的设计选择背后的原理,能够独立构建一个生产可用的本地文档问答系统,并了解常见的检索质量优化手段。
系列导航:[第一篇] → ... → [第八篇:Agent 开发] → 第九篇(当前) → 第十篇:LangGraph 初探
前置要求:已掌握前八篇内容,理解 LCEL 链的组合方式、OutputParser 和 ChatPromptTemplate
本篇目标:从零开始掌握 RAG(检索增强生成)的完整技术链路。理解为什么需要 RAG、每个环节的设计选择背后的原理,能够独立构建一个生产可用的本地文档问答系统,并了解常见的检索质量优化手段。
大模型虽然强大,但它有两个本质上的知识限制,是任何微调都解决不了的:
限制一:知识截止日期。模型的知识停留在训练数据的时间点。GPT-4o 不知道你们公司上周发布的产品,DeepSeek 不了解今天的市场行情。对于需要实时或近期信息的问题,模型只能"编造"或者说"我不知道"。
限制二:私有知识盲区。模型只训练了公开数据。你公司的内部文档、产品手册、合同模板、历史报告——这些内容模型完全不知道,无论你问得多仔细,它都无法准确回答涉及这些内容的问题。
微调能解决这个问题吗? 很多人第一反应是"把私有文档拿去微调模型"。但微调有几个实际问题:成本极高(动辄数万元)、更新困难(数据变了要重新训练)、知识注入不可靠(模型可能"遗忘"之前的训练)。对于私有知识库场景,微调几乎不是正确答案。
RAG 的解法:不要把知识"烧进"模型,而是在每次提问时,先从知识库里检索出相关内容,再把这些内容连同问题一起发给模型,让模型"参考资料"作答。就像开卷考试——学生(模型)不需要记住所有内容,考试时(推理时)可以翻书(检索文档)。
这个解法的优点显而易见:知识库更新只需要更新文档,不需要重新训练模型;支持任意私有数据;成本可控;可以精确追溯答案来源。
RAG 的完整技术链路分为两个阶段:离线阶段(文档处理和入库,通常只做一次)和在线阶段(每次用户提问时执行的检索和生成)。
【离线阶段:文档入库】 原始文档(PDF/Word/网页/...) ↓ Load(加载) Document 对象列表 ↓ Split(分割) 小的 Chunk 列表 ↓ Embed(向量化) 向量(浮点数数组) ↓ Store(存储) VectorStore(向量数据库) 【在线阶段:查询和生成】 用户问题 ↓ Embed(把问题也向量化) 问题向量 ↓ Retrieve(检索) 最相关的 N 个 Chunk ↓ Augment(增强:把 Chunk 塞进 Prompt) 带上下文的 Prompt ↓ Generate(生成) 最终回答
下面我们逐一讲解每个环节的原理、选择和代码实现。
第一步是把原始文档读入 Python,转换成 LangChain 能处理的 Document 对象。Document 是一个简单的数据类,包含两个字段:page_content(文本内容)和 metadata(元数据,如文件名、页码等)。
Document
page_content
metadata
from langchain_core.documents import Document # Document 是 RAG 流程中的基本数据单元 doc = Document( page_content="这里是文档的文本内容,可以是任意长度的字符串。", metadata={ "source": "company_handbook.pdf", # 来源文件 "page": 3, # 页码 "author": "人力资源部", # 作者 "created_at": "2024-01-15", # 创建时间 # 元数据可以包含任意键值对,会跟随 Chunk 一路传递 # 最终可以在检索结果中用来追溯答案来源 } ) print(doc.page_content) # 文本内容 print(doc.metadata) # 元数据字典
元数据非常重要。当用户问"这个说法从哪里来的"时,你可以通过元数据里的 source 和 page 精确指出答案来自哪个文档的哪一页,大大提升系统的可信度。
source
page
from langchain_community.document_loaders import PyPDFLoader # PyPDFLoader 按页加载 PDF,每页变成一个 Document loader = PyPDFLoader("documents/company_handbook.pdf") pages = loader.load() print(f"共加载 {len(pages)} 页") print(f"第一页内容(前 200 字):{pages[0].page_content[:200]}") print(f"第一页元数据:{pages[0].metadata}") # {'source': 'documents/company_handbook.pdf', 'page': 0} # 如果 PDF 有目录结构,可以用 load_and_split() 自动按节分割 # 但通常我们还是手动控制分割策略(第四节详解)
如果你的 PDF 是扫描版(图片格式),普通的 PDF 加载器会读到空内容。这种情况需要 OCR:
# 扫描版 PDF 需要 OCR(需要安装 tesseract) from langchain_community.document_loaders import UnstructuredPDFLoader loader = UnstructuredPDFLoader( "documents/scanned_report.pdf", mode="elements", # 按元素(段落、标题、表格等)分割 strategy="hi_res", # 高精度模式,调用 OCR ) docs = loader.load()
from langchain_community.document_loaders import WebBaseLoader import bs4 # 基础网页加载(会包含大量 HTML 标签和导航栏等噪声) loader = WebBaseLoader("https://python.langchain.com/docs/introduction/") docs = loader.load() # 进阶:用 BeautifulSoup 过滤,只保留正文内容 loader_clean = WebBaseLoader( web_paths=["https://python.langchain.com/docs/introduction/"], bs_kwargs={ "parse_only": bs4.SoupStrainer( class_=("theme-doc-markdown", "docMainContainer") # 只提取这些 CSS 类名对应的元素(根据目标网站调整) ) }, ) docs_clean = loader_clean.load() print(f"清理后内容长度:{len(docs_clean[0].page_content)} 字符") # 批量加载多个 URL urls = [ "https://example.com/page1", "https://example.com/page2", "https://example.com/page3", ] loader_batch = WebBaseLoader(web_paths=urls) all_docs = loader_batch.load() print(f"共加载 {len(all_docs)} 个网页")
from langchain_community.document_loaders import ( TextLoader, # .txt 纯文本 CSVLoader, # .csv 表格(每行变一个 Document) UnstructuredWordDocumentLoader, # .docx Word 文档 DirectoryLoader, # 批量加载整个目录 JSONLoader, # .json 文件(需要指定提取路径) NotionDirectoryLoader, # Notion 导出的 Markdown 目录 ) # 加载 Word 文档 loader = UnstructuredWordDocumentLoader("documents/report.docx") docs = loader.load() # 加载 CSV(每行一条记录) loader = CSVLoader( "data/products.csv", csv_args={"delimiter": ","}, source_column="product_name", # 用哪列作为 source 元数据 ) docs = loader.load() # 每个 Document 的 page_content 是这一行所有列的内容 # 批量加载目录下所有 PDF loader = DirectoryLoader( "documents/", glob="**/*.pdf", # 递归匹配所有 PDF loader_cls=PyPDFLoader, # 用 PyPDFLoader 加载每个文件 show_progress=True, # 显示进度条 ) all_docs = loader.load() print(f"共加载 {len(all_docs)} 个文档片段")
有时候文档不在文件里,而是在数据库或 API 里。可以继承 BaseLoader 实现自定义加载器:
BaseLoader
from langchain_core.document_loaders import BaseLoader from langchain_core.documents import Document from typing import Iterator import sqlite3 class SQLiteDocumentLoader(BaseLoader): """ 从 SQLite 数据库加载文档。 把数据库里的每条记录当作一个文档来处理。 """ def __init__(self, db_path: str, table: str, content_column: str, metadata_columns: list = None): self.db_path = db_path self.table = table self.content_column = content_column self.metadata_columns = metadata_columns or [] def lazy_load(self) -> Iterator[Document]: """ 懒加载:使用生成器逐条返回 Document,适合大型数据集 相比 load()(一次性加载全部),lazy_load 更节省内存 """ conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row # 让结果可以按列名访问 cursor = conn.cursor() cursor.execute(f"SELECT * FROM {self.table}") for row in cursor: # 提取文本内容 content = str(row[self.content_column]) # 提取元数据 metadata = {col: row[col] for col in self.metadata_columns if col in row.keys()} metadata["source"] = f"sqlite://{self.db_path}/{self.table}" yield Document(page_content=content, metadata=metadata) conn.close() # 使用 loader = SQLiteDocumentLoader( db_path="knowledge_base.db", table="articles", content_column="body", metadata_columns=["title", "author", "category", "created_date"], ) docs = list(loader.lazy_load()) print(f"从数据库加载了 {len(docs)} 篇文章")
加载完文档后,下一步是把它切成小块(Chunk)。这一步非常关键,直接影响后续检索的质量,但也常常被初学者忽视。
分割的必要性来自两个方面:
检索精度:如果把整个 200 页的 PDF 作为一个检索单元,当用户问第 37 页的某个细节时,整个文档都会被检索到,但模型在 200 页文字里找到正确答案的能力是有限的。把文档切成小块,检索时返回的就是与问题最相关的那几个小块,模型能更精准地定位答案。
上下文窗口限制:即使模型有 128K 的上下文窗口,把所有文档都塞进去也不现实(成本极高)。分割后的 Chunk 让我们可以只发送最相关的几块内容。
from langchain_text_splitters import RecursiveCharacterTextSplitter # RecursiveCharacterTextSplitter 是生产环境最推荐的文本分割器 # 它按优先级递归尝试不同的分隔符,尽可能保持语义完整性 splitter = RecursiveCharacterTextSplitter( chunk_size=500, # 每个 Chunk 的最大字符数 chunk_overlap=50, # 相邻 Chunk 之间的重叠字符数(保证边界信息不丢失) separators=[ # 分隔符优先级列表,从高到低尝试 "\n\n", # 优先在段落边界分割(两个换行) "\n", # 其次在行边界分割 "。", "!", "?", # 再次在句子边界分割(中文标点) ".", "!", "?", # 英文标点 " ", # 然后在词边界 "", # 最后才在任意字符处强制分割 ], length_function=len, # 用什么函数计算长度(默认是字符数) ) # 分割文档 from langchain_core.documents import Document sample_doc = Document( page_content=""" 第一章:公司简介 我们是一家专注于人工智能技术的科技公司,成立于2018年。 公司总部位于北京,在上海、广州设有分公司。 主要产品包括: 1. 智能客服系统 2. 数据分析平台 3. 自动化运营工具 第二章:公司文化 我们相信技术改变世界,以用户价值为核心驱动力。 团队由来自国内外顶尖高校和知名企业的优秀人才组成。 """, metadata={"source": "company_intro.txt", "page": 1} ) chunks = splitter.split_documents([sample_doc]) print(f"分割成 {len(chunks)} 个 Chunk") for i, chunk in enumerate(chunks): print(f"\n--- Chunk {i+1} ---") print(f"内容({len(chunk.page_content)} 字):{chunk.page_content[:100]}...") print(f"元数据:{chunk.metadata}") # 注意:元数据会从原始 Document 继承,并自动追加 start_index
这是 RAG 系统调优中最常见的问题。没有放之四海皆准的最优值,需要根据你的具体场景来决定:
# 场景一:技术文档、法律合同(需要精确定位细节) # → 推荐较小的 chunk_size,保证检索粒度足够细 fine_grained_splitter = RecursiveCharacterTextSplitter( chunk_size=300, # 约 150-200 个中文字 chunk_overlap=30, # 重叠约 10% ) # 场景二:新闻文章、产品描述(段落之间联系紧密) # → 推荐中等 chunk_size,保留更多上下文 medium_splitter = RecursiveCharacterTextSplitter( chunk_size=800, chunk_overlap=100, ) # 场景三:研究报告、书籍章节(需要整体理解) # → 推荐较大的 chunk_size coarse_splitter = RecursiveCharacterTextSplitter( chunk_size=1500, chunk_overlap=200, ) # 通用建议: # - chunk_size:300 到 1000 字符之间开始测试,根据检索质量调整 # - chunk_overlap:chunk_size 的 10%-20%,保证边界处的内容不丢失 # - 检验方法:取几个有代表性的问题,看检索到的 Chunk 是否包含正确答案
字符数分割不能精确控制 Token 消耗。如果你需要严格控制进入模型的 Token 数,可以按 Token 数分割:
from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_openai import ChatOpenAI # tiktoken 是 OpenAI 的 tokenizer,其他模型用对应的 tokenizer splitter_by_token = RecursiveCharacterTextSplitter.from_tiktoken_encoder( model_name="gpt-4o", # 使用对应模型的 tokenizer chunk_size=256, # 每个 Chunk 最多 256 个 Token chunk_overlap=32, # 重叠 32 个 Token ) # 验证:同一段文本,字符数分割 vs Token 数分割 text = "这是一段测试文本。" * 100 # 重复 100 次的中文文本 char_chunks = RecursiveCharacterTextSplitter(chunk_size=200).split_text(text) token_chunks = splitter_by_token.split_text(text) print(f"字符数分割:{len(char_chunks)} 个 Chunk") print(f"Token 数分割:{len(token_chunks)} 个 Chunk") # 中文每个字约 1-2 个 Token,所以 Token 分割的 Chunk 更少但每个更大
from langchain_text_splitters import ( MarkdownHeaderTextSplitter, # 按 Markdown 标题层级分割 HTMLHeaderTextSplitter, # 按 HTML 标题标签分割 PythonCodeTextSplitter, # 按 Python 代码结构分割(函数、类) Language, # 支持多种编程语言 RecursiveCharacterTextSplitter, ) # Markdown 文档:按标题层级分割(保留文档结构语义) markdown_splitter = MarkdownHeaderTextSplitter( headers_to_split_on=[ ("#", "h1"), # 一级标题 ("##", "h2"), # 二级标题 ("###", "h3"), # 三级标题 ] ) markdown_text = """ # 产品手册 ## 安装指南 ### Windows 安装 下载安装包,双击运行,按照向导操作。 ### Mac 安装 打开终端,运行 brew install product。 ## 使用教程 ### 快速入门 首先创建账号,然后登录系统... """ md_chunks = markdown_splitter.split_text(markdown_text) for chunk in md_chunks: print(f"内容:{chunk.page_content[:50]}") print(f"层级:{chunk.metadata}") # {'h1': '产品手册', 'h2': '安装指南', 'h3': 'Windows 安装'} print() # 代码文件:按函数/类边界分割 code_splitter = RecursiveCharacterTextSplitter.from_language( language=Language.PYTHON, chunk_size=1000, chunk_overlap=100, )
分割完成后,需要把每个 Chunk 转换成向量(Embedding)。向量是一个浮点数数组,语义相似的文本会产生相近的向量。正是这个"语义相近 → 向量相近"的特性,使得我们可以用向量相似度来实现语义检索。
想象一个三维空间,每个词对应空间中的一个点。"猫"和"狗"靠近(都是宠物),"猫"和"汽车"很远(毫无关联)。Embedding 模型做的事情,就是把文本映射到一个高维(通常是 768 或 1536 维)的向量空间里,语义相似的文本在这个空间里距离更近。
用户提问时,问题也被转换成向量,然后在向量空间里找"距离最近"的 Chunk——这就是语义检索,它比关键词匹配更强大,能理解同义词、改写表达等。
from langchain_openai import OpenAIEmbeddings import numpy as np # 初始化 Embedding 模型 embeddings = OpenAIEmbeddings( model="text-embedding-3-small", # 推荐:性价比最高 # model="text-embedding-3-large", # 精度更高,成本约 5 倍 # model="text-embedding-ada-002", # 旧版,不推荐新项目使用 ) # 单条文本向量化 text = "Python 是一种解释型编程语言" vector = embeddings.embed_query(text) print(f"向量维度:{len(vector)}") # 1536 print(f"向量前 5 个值:{vector[:5]}") # [0.023, -0.156, 0.089, ...] print(f"向量类型:{type(vector[0])}") # float # 批量向量化(推荐:比逐条调用高效得多) texts = [ "Python 是一种解释型编程语言", "Java 是强类型的面向对象语言", "今天天气很好", "机器学习需要大量数据", ] vectors = embeddings.embed_documents(texts) print(f"批量向量化:{len(vectors)} 条,每条 {len(vectors[0])} 维") # 验证语义相似性 def cosine_similarity(v1, v2): """计算两个向量的余弦相似度(越接近 1 越相似)""" v1, v2 = np.array(v1), np.array(v2) return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2)) sim_programming = cosine_similarity(vectors[0], vectors[1]) # Python vs Java sim_unrelated = cosine_similarity(vectors[0], vectors[2]) # Python vs 天气 print(f"\nPython vs Java 相似度:{sim_programming:.4f}") # 较高(都是编程语言) print(f"Python vs 天气 相似度:{sim_unrelated:.4f}") # 较低(毫无关联)
# 选项一:使用 DashScope(阿里云)的 text-embedding-v2 # 适合内网环境或不想依赖 OpenAI 的场景 from langchain_community.embeddings import DashScopeEmbeddings embeddings_dashscope = DashScopeEmbeddings( model="text-embedding-v2", dashscope_api_key="your-dashscope-key", ) # 选项二:本地运行开源 Embedding 模型(数据不出内网) # 推荐模型:BAAI/bge-m3(多语言,效果优秀) from langchain_community.embeddings import HuggingFaceEmbeddings embeddings_local = HuggingFaceEmbeddings( model_name="BAAI/bge-m3", # 会自动从 HuggingFace 下载模型 model_kwargs={"device": "cpu"}, # 改成 "cuda" 可以用 GPU 加速 encode_kwargs={ "normalize_embeddings": True, # 归一化,提升余弦相似度计算效果 "batch_size": 32, # 批处理大小 }, ) # 也可以用 Ollama 本地运行 from langchain_community.embeddings import OllamaEmbeddings embeddings_ollama = OllamaEmbeddings( model="nomic-embed-text", # ollama 支持的 embedding 模型 base_url="http://localhost:11434", )
选型建议:
text-embedding-3-small
BAAI/bge-m3
向量化之后,需要把向量存储起来,支持后续的相似度检索。LangChain 支持多种向量数据库(VectorStore),每种都有不同的特性和适用场景。
FAISS(Facebook AI Similarity Search)是 Meta 开源的高效向量检索库,完全在本地内存中运行,不需要启动任何服务:
from langchain_community.vectorstores import FAISS from langchain_openai import OpenAIEmbeddings from langchain_core.documents import Document embeddings = OpenAIEmbeddings(model="text-embedding-3-small") # 方式一:从 Document 列表直接创建 docs = [ Document(page_content="Python 支持面向对象和函数式编程范式", metadata={"source": "python.txt"}), Document(page_content="JavaScript 是 Web 前端开发的核心语言", metadata={"source": "js.txt"}), Document(page_content="机器学习需要大量标注数据进行训练", metadata={"source": "ml.txt"}), Document(page_content="深度学习是机器学习的子领域,使用神经网络", metadata={"source": "dl.txt"}), Document(page_content="自然语言处理(NLP)是人工智能的重要分支", metadata={"source": "nlp.txt"}), ] # from_documents 会自动完成向量化并构建索引 # 这一步会调用 Embedding API,有网络延迟和费用 vectorstore = FAISS.from_documents(docs, embeddings) # ---- 相似度搜索(最基础的检索方式)---- query = "Python 编程语言的特点" results = vectorstore.similarity_search(query, k=2) # k=返回最相关的 2 个 print("检索结果:") for i, doc in enumerate(results): print(f"\n第 {i+1} 名:") print(f" 内容:{doc.page_content}") print(f" 来源:{doc.metadata['source']}") # ---- 带分数的搜索(可以看到相似度得分)---- results_with_score = vectorstore.similarity_search_with_score(query, k=2) for doc, score in results_with_score: print(f"得分:{score:.4f},内容:{doc.page_content[:50]}") # 得分越低 = 距离越近 = 越相关(FAISS 用 L2 距离) # ---- 持久化:保存到本地,下次直接加载 ---- vectorstore.save_local("faiss_index") # 保存到 faiss_index/ 目录 # 下次直接加载,不需要重新向量化 vectorstore_loaded = FAISS.load_local( "faiss_index", embeddings, allow_dangerous_deserialization=True, # 1.0 版本需要显式允许反序列化 ) # ---- 增量更新:添加新文档 ---- new_docs = [Document(page_content="Go 语言以并发性能著称", metadata={"source": "go.txt"})] vectorstore.add_documents(new_docs) # 注意:add_documents 只在内存中更新,需要再次 save_local 才持久化
Chroma 也是本地运行的,但相比 FAISS,它原生支持持久化存储(SQLite),不需要手动保存/加载:
from langchain_community.vectorstores import Chroma from langchain_openai import OpenAIEmbeddings embeddings = OpenAIEmbeddings(model="text-embedding-3-small") # 创建持久化的 Chroma 向量库 # persist_directory 指定数据存储目录,进程重启后数据自动恢复 vectorstore = Chroma( collection_name="my_knowledge_base", # 集合名称(类似数据库的表名) embedding_function=embeddings, persist_directory="chroma_db", # 数据目录 ) # 添加文档 from langchain_core.documents import Document docs = [ Document(page_content="退款需要在购买后 7 天内申请", metadata={"category": "policy", "version": "2024"}), Document(page_content="会员用户享受 9.5 折优惠", metadata={"category": "vip", "version": "2024"}), Document(page_content="配送时间通常为 3-5 个工作日", metadata={"category": "logistics"}), ] vectorstore.add_documents(docs) # 带元数据过滤的检索(这是 Chroma 相对 FAISS 的优势) results = vectorstore.similarity_search( query="怎么退款", k=2, filter={"category": "policy"} # 只在 category=policy 的文档里搜索 ) # 元数据过滤让你可以把不同类型的文档存在同一个向量库里,检索时按需过滤 print(f"过滤后结果:{len(results)} 条") for doc in results: print(f" {doc.page_content}")
在 LangChain 的 RAG 链中,VectorStore 通常要先转换成 Retriever(检索器)才能方便地接入 LCEL 链:
Retriever
from langchain_community.vectorstores import FAISS from langchain_openai import OpenAIEmbeddings embeddings = OpenAIEmbeddings(model="text-embedding-3-small") vectorstore = FAISS.load_local("faiss_index", embeddings, allow_dangerous_deserialization=True) # 最简单的转换:直接调用 as_retriever() retriever = vectorstore.as_retriever( search_type="similarity", # 检索方式 search_kwargs={"k": 4}, # 返回前 4 个相关文档 ) # 检索测试 docs = retriever.invoke("Python 的特点是什么?") print(f"检索到 {len(docs)} 个文档") # ---- 更多检索类型 ---- # MMR(最大边际相关性):平衡相关性和多样性,避免返回重复内容 retriever_mmr = vectorstore.as_retriever( search_type="mmr", search_kwargs={ "k": 4, # 最终返回 4 个 "fetch_k": 20, # 先候选 20 个,再从中选多样性最高的 4 个 "lambda_mult": 0.5, # 0=最大多样性,1=最大相关性,0.5 是平衡点 } ) # 相似度阈值过滤:只返回相似度超过阈值的文档 retriever_threshold = vectorstore.as_retriever( search_type="similarity_score_threshold", search_kwargs={"score_threshold": 0.7} # 只返回相似度 ≥ 0.7 的文档 )
有了向量库和检索器,可以把整个 RAG 流程连接成一条 LCEL 链了。
from langchain_community.vectorstores import FAISS from langchain_openai import OpenAIEmbeddings, ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough from langchain_core.documents import Document # ---- 准备知识库 ---- embeddings = OpenAIEmbeddings(model="text-embedding-3-small") docs = [ Document(page_content="本公司退款政策:购买后 7 天内可无理由退款,超过 7 天需提供质量问题证明。退款到账时间为 3-5 个工作日。", metadata={"source": "policy.txt"}), Document(page_content="会员等级说明:普通会员购满 1000 元升级银卡,银卡用户享受 9.5 折优惠;购满 5000 元升级金卡,金卡用户享受 9 折优惠。", metadata={"source": "membership.txt"}), Document(page_content="配送说明:默认顺丰快递,3-5 个工作日送达。偏远地区(西藏、新疆等)需加收运费 15 元,送达时间延长至 7-10 个工作日。", metadata={"source": "shipping.txt"}), Document(page_content="产品质保:所有电子产品享受 1 年免费保修,人为损坏不在保修范围内。保修期间免费维修,超出保修期按成本价收费。", metadata={"source": "warranty.txt"}), ] vectorstore = FAISS.from_documents(docs, embeddings) retriever = vectorstore.as_retriever(search_kwargs={"k": 2}) # ---- RAG Prompt ---- rag_prompt = ChatPromptTemplate.from_messages([ ("system", """你是一个专业的客服助手,请根据提供的参考资料回答用户问题。 参考资料: {context} 回答规则: 1. 只根据参考资料中的信息回答,不要添加资料中没有的内容 2. 如果参考资料中没有相关信息,明确告知用户"根据现有资料,暂无此信息" 3. 回答要简洁准确,必要时引用具体数字或政策条款"""), ("human", "{question}"), ]) # ---- 格式化检索结果的辅助函数 ---- def format_docs(docs: list) -> str: """把检索到的 Document 列表格式化成字符串,注入 Prompt""" formatted = [] for i, doc in enumerate(docs, 1): source = doc.metadata.get("source", "未知来源") formatted.append(f"【资料 {i}】(来源:{source})\n{doc.page_content}") return "\n\n".join(formatted) # ---- 组装 RAG 链 ---- llm = ChatOpenAI(model="gpt-4o", temperature=0) rag_chain = ( # 第一步:从用户问题中提取检索关键信息,同时保留原始问题 {"context": retriever | format_docs, "question": RunnablePassthrough()} # 数据流: # 用户输入 "退款要多久?" # → retriever 接收这个字符串,检索最相关的 2 个 Document # → format_docs 把 Document 列表格式化成字符串 # 同时,RunnablePassthrough 把原始问题原样传给 "question" # 最终得到:{"context": "【资料 1】...", "question": "退款要多久?"} | rag_prompt | llm | StrOutputParser() ) # ---- 测试 ---- questions = [ "退款需要多少天到账?", "金卡会员有什么优惠?", "西藏地区的配送费用是多少?", "产品可以保修多久?", "如何办理会员?", # 知识库里没有这个信息 ] for q in questions: print(f"\n问:{q}") answer = rag_chain.invoke(q) print(f"答:{answer}")
在实际应用中,用户经常需要知道"这个答案的依据是什么"。让 RAG 系统输出答案的同时也输出来源引用:
from langchain_core.runnables import RunnableParallel, RunnablePassthrough from langchain_core.output_parsers import StrOutputParser # 并行运行:同时生成答案和保留原始检索结果 rag_chain_with_source = RunnableParallel( answer=rag_chain, # 单独保留检索到的原始文档,用于后续展示来源 source_documents=retriever, ) result = rag_chain_with_source.invoke("退款政策是什么?") print("答案:") print(result["answer"]) print("\n参考来源:") for doc in result["source_documents"]: source = doc.metadata.get("source", "未知") preview = doc.page_content[:80] print(f" 📄 {source}:{preview}...")
基础 RAG 链搭建起来之后,往往会发现检索质量不理想——有时候问题明明答案在文档里,但就是检索不到;或者检索到了但内容不够准确。这一节介绍几种常用的优化手段。
在多轮对话中,用户经常用"它"、"这个"来指代之前的内容。这种指代词会让检索失效,因为向量数据库不知道"它"指什么:
from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.messages import HumanMessage, AIMessage llm = ChatOpenAI(model="gpt-4o", temperature=0) # 问题重写链:把含有指代的问题改写成独立、完整的问题 rewrite_prompt = ChatPromptTemplate.from_messages([ ("system", """你是一个问题重写助手。 根据对话历史,将用户最新的问题改写为一个不依赖对话上下文的独立完整问题。 只输出改写后的问题,不要任何解释。 如果问题本身已经完整清晰,直接原样输出。"""), ("human", """对话历史: {chat_history} 用户最新问题:{question} 改写后的独立问题:"""), ]) rewrite_chain = rewrite_prompt | llm | StrOutputParser() # 测试 chat_history = [ HumanMessage(content="退款政策是什么?"), AIMessage(content="退款需要在购买后 7 天内申请..."), ] followup_question = "那超过时间了怎么办?" # 把历史格式化成字符串 history_text = "\n".join([ f"{'用户' if isinstance(m, HumanMessage) else '助手'}:{m.content}" for m in chat_history ]) rewritten = rewrite_chain.invoke({ "chat_history": history_text, "question": followup_question, }) print(f"原始问题:{followup_question}") print(f"改写后:{rewritten}") # 改写后:如果超过购买后 7 天的退款期限,应该如何处理? # 用改写后的问题去检索,效果明显更好
单一检索可能因为用词不同而错过相关文档。用多种方式提问,然后合并结果:
from langchain_core.runnables import RunnableParallel from langchain_openai import ChatOpenAI from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate llm = ChatOpenAI(model="gpt-4o", temperature=0.3) # 生成同义改写的多个版本 multi_query_prompt = ChatPromptTemplate.from_messages([ ("system", """你是一个搜索助手,帮助用户从不同角度表达同一个问题,以提升搜索效果。 请生成 3 个语义相同但表达方式不同的问题变体,每行一个,不要编号。 变体要涵盖不同的关键词和表达方式,但意思完全相同。"""), ("human", "原始问题:{question}"), ]) def generate_queries(question: str) -> list[str]: """生成多个问题变体""" response = (multi_query_prompt | llm | StrOutputParser()).invoke({"question": question}) variants = [q.strip() for q in response.strip().split("\n") if q.strip()] return [question] + variants[:3] # 原始问题 + 最多 3 个变体 def multi_query_retrieve(question: str, retriever, k: int = 2) -> list: """多路检索:用多个问题版本检索,合并去重""" queries = generate_queries(question) print(f"检索使用的问题变体:{queries}") all_docs = [] seen_contents = set() for query in queries: docs = retriever.invoke(query) for doc in docs: # 去重:避免同一内容被多次检索后重复添加 if doc.page_content not in seen_contents: seen_contents.add(doc.page_content) all_docs.append(doc) return all_docs[:k * 2] # 返回不超过 k*2 个文档 # 使用 original_question = "货物损坏了能退吗" retrieved_docs = multi_query_retrieve(original_question, retriever) print(f"\n多路检索共找到 {len(retrieved_docs)} 个不重复文档")
向量检索用相似度来排序,但相似度高不等于真正有用。用一个专门的重排序模型(Reranker)对初步检索的结果重新打分,可以显著提升最终质量:
# 方案:用 Cross-Encoder 重排序(比 Bi-Encoder 精度更高,但速度更慢) # 先用向量检索快速候选 20 个,再用 Reranker 精选 4 个 from langchain_core.documents import Document def rerank_documents(query: str, docs: list, top_n: int = 4) -> list: """ 用语言模型重新评估每个文档和问题的相关性并排序。 这是一个简化版本,实际生产中推荐使用专门的 Reranker 模型。 """ from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser import concurrent.futures llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) # 用小模型降低成本 score_prompt = ChatPromptTemplate.from_messages([ ("system", "评估以下文档对于回答给定问题的相关性,输出 JSON 格式:{\"score\": 0-10}"), ("human", "问题:{query}\n\n文档:{doc_content}"), ]) score_chain = score_prompt | llm | JsonOutputParser() # 并行评分(降低延迟) def score_doc(doc): try: result = score_chain.invoke({"query": query, "doc_content": doc.page_content}) return doc, result.get("score", 0) except Exception: return doc, 0 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: scored = list(executor.map(score_doc, docs)) # 按分数降序排列,取前 top_n 个 scored.sort(key=lambda x: x[1], reverse=True) return [doc for doc, score in scored[:top_n]] # 在 RAG 链中集成 Reranker def enhanced_retrieve(question: str) -> list: # 第一步:向量检索候选 10 个 candidates = retriever_for_rerank = vectorstore.as_retriever( search_kwargs={"k": 10} ).invoke(question) # 第二步:重排序取最好的 4 个 return rerank_documents(question, candidates, top_n=4)
把本篇所有知识综合运用,构建一个完整的、可以直接使用的本地文档问答系统:
import os from pathlib import Path from typing import List, Optional from dotenv import load_dotenv from langchain_community.document_loaders import PyPDFLoader, TextLoader, DirectoryLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_openai import OpenAIEmbeddings, ChatOpenAI from langchain_community.vectorstores import FAISS from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough, RunnableParallel from langchain_core.chat_history import InMemoryChatMessageHistory from langchain_core.runnables.history import RunnableWithMessageHistory from langchain_core.messages import HumanMessage, AIMessage from langchain_core.documents import Document load_dotenv() # ============================================================ # 文档索引构建器 # ============================================================ class DocumentIndexBuilder: """负责文档的加载、分割和向量化入库""" def __init__( self, embeddings_model: str = "text-embedding-3-small", chunk_size: int = 600, chunk_overlap: int = 80, index_path: str = "local_faiss_index", ): self.embeddings = OpenAIEmbeddings(model=embeddings_model) self.splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, separators=["\n\n", "\n", "。", "!", "?", ".", "!", "?", " ", ""], ) self.index_path = index_path self.vectorstore: Optional[FAISS] = None def load_documents(self, source_path: str) -> List[Document]: """加载文档(支持文件和目录)""" path = Path(source_path) docs = [] if path.is_file(): suffix = path.suffix.lower() if suffix == ".pdf": loader = PyPDFLoader(str(path)) elif suffix in [".txt", ".md"]: loader = TextLoader(str(path), encoding="utf-8") else: print(f"不支持的文件格式:{suffix}") return [] docs = loader.load() print(f"加载文件:{path.name},共 {len(docs)} 页/段") elif path.is_dir(): for file in path.rglob("*"): if file.suffix.lower() in [".pdf", ".txt", ".md"]: sub_docs = self.load_documents(str(file)) docs.extend(sub_docs) print(f"从目录加载:{path},共 {len(docs)} 个文档片段") return docs def build_index(self, source_path: str) -> FAISS: """构建向量索引(加载 → 分割 → 向量化 → 存储)""" print(f"\n{'='*50}") print(f"开始构建索引:{source_path}") # 第一步:加载 print("\n[1/3] 加载文档...") raw_docs = self.load_documents(source_path) if not raw_docs: raise ValueError(f"未能从 {source_path} 加载任何文档") # 第二步:分割 print(f"\n[2/3] 分割文档(chunk_size={self.splitter._chunk_size})...") chunks = self.splitter.split_documents(raw_docs) print(f"分割完成:{len(raw_docs)} 个原始文档 → {len(chunks)} 个 Chunk") # 打印分割统计 lengths = [len(c.page_content) for c in chunks] print(f"Chunk 长度统计:最短 {min(lengths)} 字,最长 {max(lengths)} 字,平均 {sum(lengths)//len(lengths)} 字") # 第三步:向量化并存储 print(f"\n[3/3] 向量化并构建索引(共 {len(chunks)} 个 Chunk)...") self.vectorstore = FAISS.from_documents(chunks, self.embeddings) # 持久化 self.vectorstore.save_local(self.index_path) print(f"\n✅ 索引构建完成,已保存至:{self.index_path}") print(f"{'='*50}\n") return self.vectorstore def load_index(self) -> FAISS: """加载已有索引""" if not Path(self.index_path).exists(): raise FileNotFoundError(f"索引不存在:{self.index_path},请先调用 build_index()") self.vectorstore = FAISS.load_local( self.index_path, self.embeddings, allow_dangerous_deserialization=True ) print(f"✅ 已加载索引:{self.index_path}") return self.vectorstore def get_or_build(self, source_path: str) -> FAISS: """如果索引已存在则直接加载,否则重新构建""" if Path(self.index_path).exists(): return self.load_index() return self.build_index(source_path) # ============================================================ # 问答引擎 # ============================================================ class RAGChatEngine: """基于 RAG 的多轮问答引擎,带来源引用""" def __init__(self, vectorstore: FAISS, model: str = "gpt-4o"): self.retriever = vectorstore.as_retriever( search_type="mmr", # 使用 MMR 保证多样性 search_kwargs={"k": 4, "fetch_k": 12}, ) self.llm = ChatOpenAI(model=model, temperature=0) self.session_store = {} self._build_chain() def _build_chain(self): """构建带历史的 RAG 链""" # 问题重写链:处理多轮对话中的指代问题 rewrite_prompt = ChatPromptTemplate.from_messages([ ("system", "根据对话历史,将用户问题改写为独立完整的问题。只输出改写后的问题。"), MessagesPlaceholder("chat_history"), ("human", "将此问题改写为独立问题:{input}"), ]) self.rewrite_chain = rewrite_prompt | self.llm | StrOutputParser() # RAG 主链 rag_prompt = ChatPromptTemplate.from_messages([ ("system", """你是一个专业的文档问答助手。 请根据以下参考资料回答用户问题: {context} --- 重要规则: - 只使用参考资料中的信息,不要添加资料以外的内容 - 如果资料中没有相关信息,直接说"根据现有文档,暂无此信息" - 回答后,在最后用"📄 参考来源:"标注使用的文档来源"""), MessagesPlaceholder("chat_history"), ("human", "{input}"), ]) def get_context_and_sources(input_dict: dict): """检索并格式化上下文""" # 如果有历史,先重写问题 if input_dict.get("chat_history"): standalone_question = self.rewrite_chain.invoke(input_dict) else: standalone_question = input_dict["input"] # 检索 docs = self.retriever.invoke(standalone_question) # 格式化 context_parts = [] sources = [] for i, doc in enumerate(docs, 1): source = doc.metadata.get("source", "未知来源") page = doc.metadata.get("page", "") source_str = f"{source}" + (f" 第{page+1}页" if page != "" else "") context_parts.append(f"【资料{i}】{source_str}\n{doc.page_content}") if source_str not in sources: sources.append(source_str) return { "context": "\n\n".join(context_parts), "input": input_dict["input"], "chat_history": input_dict.get("chat_history", []), "_sources": sources, # 临时保存来源列表 } from langchain_core.runnables import RunnableLambda self.chain = ( RunnableLambda(get_context_and_sources) | rag_prompt | self.llm | StrOutputParser() ) # 包装带历史 def get_session_history(session_id: str) -> InMemoryChatMessageHistory: if session_id not in self.session_store: self.session_store[session_id] = InMemoryChatMessageHistory() return self.session_store[session_id] self.chain_with_history = RunnableWithMessageHistory( self.chain, get_session_history, input_messages_key="input", history_messages_key="chat_history", ) def chat(self, question: str, session_id: str = "default") -> str: """发起一次问答""" config = {"configurable": {"session_id": session_id}} return self.chain_with_history.invoke({"input": question}, config=config) def stream_chat(self, question: str, session_id: str = "default"): """流式输出""" config = {"configurable": {"session_id": session_id}} for chunk in self.chain_with_history.stream({"input": question}, config=config): yield chunk def clear_history(self, session_id: str = "default"): """清空对话历史""" if session_id in self.session_store: self.session_store[session_id].clear() # ============================================================ # 主程序:演示用法 # ============================================================ def demo(): # 准备示例文档(实际使用时替换为真实文档路径) demo_docs_dir = Path("demo_docs") demo_docs_dir.mkdir(exist_ok=True) # 创建示例文本文件 sample_content = """ 公司知识库 一、退款政策 购买后 7 天内可申请无理由退款。质量问题可在 30 天内申请退换货。 退款到账时间:支付宝/微信 1-3 个工作日,银行卡 3-5 个工作日。 申请方式:登录官网 → 我的订单 → 申请退款。 二、会员体系 普通会员:注册即获得,享受生日双倍积分。 银卡会员:累计消费满 1000 元,享受 9.5 折优惠、每月免费快递。 金卡会员:累计消费满 5000 元,享受 9 折优惠、专属客服、优先发货。 三、配送说明 标准配送:顺丰快递,3-5 个工作日。 急速配送:京东物流,次日达(部分城市)。 偏远地区(西藏、新疆、海南离岛):7-15 个工作日,加收运费 20 元。 免费配送条件:单笔订单满 99 元。 四、产品质保 电子产品:购买之日起 1 年免费保修,包含正常使用损耗。 非电子产品:30 天质量问题免费换货。 不在保修范围:人为损坏、进水、改装。 """ (demo_docs_dir / "company_knowledge.txt").write_text(sample_content, encoding="utf-8") # 构建索引 builder = DocumentIndexBuilder(index_path="demo_faiss_index") vectorstore = builder.get_or_build(str(demo_docs_dir)) # 创建问答引擎 engine = RAGChatEngine(vectorstore) # 模拟多轮对话 session = "demo_session" print("\n" + "="*60) print("本地文档问答系统启动") print("="*60) test_questions = [ "退款一般需要多久到账?", "那质量有问题呢?", # 测试多轮:指代"退款" "金卡会员有什么特权?", "西藏地区配送要多久?", "电子产品的保修期是多久?", "积分怎么获得?", # 测试知识库中没有的信息 ] for question in test_questions: print(f"\n🙋 用户:{question}") print("🤖 助手:", end="") # 流式输出 for chunk in engine.stream_chat(question, session_id=session): print(chunk, end="", flush=True) print() if __name__ == "__main__": demo()
{context: retriever | format_docs, question: RunnablePassthrough()}
你现在应该能做到: - 从零搭建一个完整的 RAG 系统,支持 PDF、TXT、网页等多种文档格式 - 理解 chunk_size 和 chunk_overlap 的调优思路 - 选择合适的 VectorStore,并正确配置 Retriever - 构建带历史记忆和来源引用的生产级 RAG 问答系统 - 用问题重写、多路检索等手段提升检索质量
《LangGraph 初探——构建复杂的有状态智能体》
RAG 和 Agent 都已掌握,第十篇将介绍突破 LCEL 线性限制的高阶框架——LangGraph:
代码仓库:本系列所有可运行代码示例统一维护在 GitHub,每篇对应独立目录,可直接克隆运行。 系列导航:[第一篇] → ... → [第八篇] → 第九篇(当前) → 第十篇 → ...
代码仓库:本系列所有可运行代码示例统一维护在 GitHub,每篇对应独立目录,可直接克隆运行。
系列导航:[第一篇] → ... → [第八篇] → 第九篇(当前) → 第十篇 → ...
还没有评论,来抢沙发吧!
博客管理员
40 篇文章
还没有评论,来抢沙发吧!