系列导读:本文是「智答」RAG 智能问答系统开发系列的第二篇。上一篇我们用 30 行代码跑通了 RAG 的最简链路,用的是极度简化的文档处理方案。本篇深入离线链路的上半段:如何把真实世界里格式各异、质量参差的原始文档,加工成干净、结构合理的知识片段——这是整个 RAG 系统质量的真正地基。
在正式写代码之前,我想先讲一个反直觉的观点:RAG 系统最终的回答质量,很大程度上不是由你选的大模型决定的,而是由你的文档处理质量决定的。
这个结论听起来有点悲观,但它背后有严格的逻辑支撑。
想象你是一名顾问,要回答客户关于某项目的问题。你的助手给你准备了两份参考资料:
版本 A(未处理的原始文档):
第3章 系统架构...........................................................12 1. 概 述 本 系 统 采 用 微 服 务 架 构 设 计,主 要 包 含 以 下 模 块: ●用户管理模块(详见4.1节) ● 数据处理模 块(详见4.2节) Copyright © 2023 ACME Corp. All rights reserved. 页码:12/89
版本 B(清洗后的文档):
系统采用微服务架构设计,主要包含用户管理模块和数据处理模块。
你会基于哪份资料给客户更准确的答复?答案显而易见。
但现实中的文档比这更复杂:PDF 转文本时产生的乱码、PPT 里嵌套的图片说明文字、Word 文档里反复出现的页眉页脚、Markdown 里的 HTML 标签……这些噪声如果不处理,会被原封不动地向量化进索引库,在检索时带来干扰,在生成时被 LLM 引用为"参考资料"。
软件工程里有一个经典原则叫 GIGO:Garbage In, Garbage Out(垃圾进,垃圾出)。在 RAG 系统里,这个原则的表现是:
文档处理的质量,是 RAG 系统能力的真实上限。再好的检索算法,也无法从低质量的片段里召回高质量的答案。
本篇要实现的是离线链路的前两个阶段:
原始文档 (PDF/MD/TXT/PPT) │ ▼ ┌─────────────────┐ │ 文档加载 │ ← 本篇第3节:各类 Loader │ (Document) │ └────────┬────────┘ │ ▼ ┌─────────────────┐ │ 文本清洗 │ ← 本篇第4节:清洗管道 │ (Clean Text) │ └────────┬────────┘ │ ▼ ┌─────────────────┐ │ 文档切分 │ ← 本篇第5、6节:父子切分策略 │ (Chunks) │ └────────┬────────┘ │ ▼ [交给第3篇:向量化与存储]
本篇结束时,我们会把这三个阶段封装成两个模块文件:core/loader.py 和 core/splitter.py,可以直接插入最终项目。
core/loader.py
core/splitter.py
LangChain 的文档加载器(Document Loader)负责把各种格式的文件读取成统一的 Document 对象。Document 对象有两个核心字段:
Document
from langchain_core.documents import Document doc = Document( page_content="这是文档的文本内容", # 实际文本 metadata={ # 元数据(来源、页码等) "source": "data/raw/manual.pdf", "page": 3, "file_type": "pdf", } )
metadata 是一个自由字典,你可以往里放任何你需要的信息。这些元数据在后续检索时可以作为过滤条件,也可以在生成回答时作为引用来源展示给用户。
metadata
这是最简单的情况:
# core/loader.py(初始版本,后续会扩展) from langchain_community.document_loaders import ( TextLoader, UnstructuredMarkdownLoader, ) # TXT 文件 def load_txt(file_path: str) -> list: loader = TextLoader(file_path, encoding="utf-8") docs = loader.load() # TextLoader 会把整个文件作为一个 Document 返回 # 所以 docs 通常只有 1 个元素 return docs # Markdown 文件 def load_markdown(file_path: str) -> list: # UnstructuredMarkdownLoader 会解析 Markdown 结构 # mode="elements" 会把不同类型的块(标题、段落、列表)分别返回 loader = UnstructuredMarkdownLoader( file_path, mode="elements", # 按元素分割,保留结构信息 ) docs = loader.load() return docs
安装 Markdown 加载器所需的额外依赖:
pip install unstructured[md]
PDF 是最常见也最麻烦的格式。麻烦在于 PDF 本质上是一个"排版指令集",而不是结构化的文本格式,从中提取文本会丢失大量格式信息。
LangChain 提供了几种 PDF Loader,适用场景各不相同:
# 方案一:PyPDFLoader(最常用,按页切分) from langchain_community.document_loaders import PyPDFLoader def load_pdf_by_page(file_path: str) -> list: """ PyPDFLoader 会把每一页作为一个独立的 Document。 优点:实现简单,速度快。 缺点:一个概念可能跨页,按页切分会破坏语义连续性。 适用:页面结构清晰、每页相对独立的文档(如规章制度、手册)。 """ loader = PyPDFLoader(file_path) docs = loader.load() # 每个 doc 的 metadata 包含 page 字段(从0开始) return docs # 方案二:PDFMinerLoader(文本提取质量更高) from langchain_community.document_loaders import PDFMinerLoader def load_pdf_high_quality(file_path: str) -> list: """ PDFMiner 的文本提取质量通常优于 PyPDF, 对复杂排版(双栏、表格密集)的处理更好。 适用:对提取质量要求较高的场景。 """ loader = PDFMinerLoader(file_path) docs = loader.load() return docs # 方案三:UnstructuredPDFLoader(结构感知,支持图文混排) from langchain_community.document_loaders import UnstructuredPDFLoader def load_pdf_structured(file_path: str) -> list: """ Unstructured 可以识别 PDF 里的标题、段落、表格等结构, 并在 metadata 里标注元素类型('Title', 'NarrativeText', 'Table'等)。 适用:需要保留文档结构信息,或处理包含表格的文档。 需要安装:pip install unstructured[pdf] """ loader = UnstructuredPDFLoader( file_path, mode="elements", strategy="hi_res", # 高精度模式(较慢),也可用 "fast" ) docs = loader.load() return docs
实际项目的选择建议:对于大多数技术文档,PyPDFLoader 足够用,速度快且稳定。如果文档包含大量表格或复杂排版,切换到 UnstructuredPDFLoader。
PyPDFLoader
UnstructuredPDFLoader
安装依赖:
pip install pypdf # PyPDFLoader pip install pdfminer.six # PDFMinerLoader pip install unstructured[pdf] # UnstructuredPDFLoader(较重,按需安装)
无论使用哪种 Loader,加载完成后都应该统一补充元数据,方便后续的过滤检索和来源引用:
import os from datetime import datetime from langchain_core.documents import Document def enrich_metadata(docs: list[Document], file_path: str) -> list[Document]: """ 统一补充元数据: - source: 文件路径 - file_name: 文件名 - file_type: 文件后缀 - ingested_at: 入库时间戳 """ file_name = os.path.basename(file_path) file_type = os.path.splitext(file_name)[1].lower().lstrip(".") ingested_at = datetime.now().isoformat() for doc in docs: doc.metadata.update({ "source": file_path, "file_name": file_name, "file_type": file_type, "ingested_at": ingested_at, }) return docs
在实际项目里,你不会知道用户会上传什么格式的文件,需要一个能自动路由的入口函数:
import os from pathlib import Path from langchain_core.documents import Document from langchain_community.document_loaders import ( TextLoader, PyPDFLoader, UnstructuredMarkdownLoader, ) # 支持的文件格式映射 SUPPORTED_EXTENSIONS = { ".txt": "text", ".md": "markdown", ".markdown": "markdown", ".pdf": "pdf", } def load_document(file_path: str) -> list[Document]: """ 根据文件后缀自动选择合适的 Loader,统一返回 Document 列表。 Args: file_path: 文件路径 Returns: Document 列表,每个 Document 包含 page_content 和 metadata Raises: ValueError: 不支持的文件格式 FileNotFoundError: 文件不存在 """ path = Path(file_path) if not path.exists(): raise FileNotFoundError(f"文件不存在:{file_path}") ext = path.suffix.lower() if ext not in SUPPORTED_EXTENSIONS: raise ValueError( f"不支持的文件格式:{ext}。" f"支持的格式:{list(SUPPORTED_EXTENSIONS.keys())}" ) file_type = SUPPORTED_EXTENSIONS[ext] if file_type == "text": loader = TextLoader(file_path, encoding="utf-8") docs = loader.load() elif file_type == "markdown": try: loader = UnstructuredMarkdownLoader(file_path, mode="elements") docs = loader.load() except Exception: # unstructured 不可用时回退到 TextLoader loader = TextLoader(file_path, encoding="utf-8") docs = loader.load() elif file_type == "pdf": loader = PyPDFLoader(file_path) docs = loader.load() # 统一补充元数据 docs = enrich_metadata(docs, file_path) return docs def load_directory(dir_path: str) -> list[Document]: """ 递归加载目录下所有支持格式的文件。 Args: dir_path: 目录路径 Returns: 所有文档的 Document 列表 """ all_docs = [] dir_path = Path(dir_path) for ext in SUPPORTED_EXTENSIONS.keys(): files = list(dir_path.rglob(f"*{ext}")) for file_path in files: try: docs = load_document(str(file_path)) all_docs.extend(docs) print(f" ✅ 加载成功:{file_path.name}({len(docs)} 个块)") except Exception as e: print(f" ❌ 加载失败:{file_path.name} - {e}") print(f"\n 共加载 {len(all_docs)} 个文档块,来自 {dir_path}") return all_docs
加载完文档之后,page_content 里的内容往往还很"脏"。清洗的目标是:在不损失信息的前提下,去除对语义理解没有帮助的噪声。
page_content
在处理真实业务文档时,你会遇到以下典型噪声:
类型一:格式符号残留
PDF 转文本后常见: "本系统采用微●服●务●架●构" (字符间插入了多余符号) "第 3 章 系 统 架 构" (字符之间有大量空格)
类型二:页眉页脚
"Confidential - Internal Use Only" "Page 12 of 89" "© 2023 ACME Corporation. All Rights Reserved."
类型三:无意义分隔符和空白
"----------------------------" "\n\n\n\n\n" " " (大量空格)
类型四:目录和引用信息
"详见第4.2节" "如图3-1所示" "参考附录B"
类型五:Markdown 语法符号(如果不需要保留格式)
"## 3.2 系统架构" → 可能需要保留标题层级信息 "**重要**" → 加粗符号 "[链接文字](url)" → 链接
清洗管道的设计原则是"组合式"——每个清洗操作是一个独立函数,通过管道顺序执行,方便针对不同文档类型启用或禁用特定步骤:
# core/loader.py(清洗部分) import re from langchain_core.documents import Document from typing import Callable # ── 基础清洗函数 ────────────────────────────────────────────── def remove_extra_whitespace(text: str) -> str: """ 去除多余的空白字符: - 连续空格压缩为单个空格 - 连续换行超过2个的压缩为2个 - 去除行尾空格 """ # 去除行尾空格 text = re.sub(r'[ \t]+$', '', text, flags=re.MULTILINE) # 连续空格压缩 text = re.sub(r' {2,}', ' ', text) # 连续换行压缩(保留最多2个) text = re.sub(r'\n{3,}', '\n\n', text) return text.strip() def remove_page_markers(text: str) -> str: """ 去除页码、页眉页脚等页面标记。 覆盖常见的中英文格式。 """ patterns = [ r'[-─━]+\s*第\s*\d+\s*页\s*[-─━]*', # ── 第3页 ── r'第\s*\d+\s*页\s*[/共]\s*\d+\s*页', # 第3页/共89页 r'Page\s+\d+\s*(?:of\s+\d+)?', # Page 12 of 89 r'^\d+\s*$', # 单独的页码行 r'[-─━=]{10,}', # 长分隔线 ] for pattern in patterns: text = re.sub(pattern, '', text, flags=re.MULTILINE | re.IGNORECASE) return text def remove_copyright_notices(text: str) -> str: """ 去除版权声明、保密标记等法律声明文本。 """ patterns = [ r'©.*?版权所有.*?\n', r'Copyright\s*©.*?\n', r'All\s+[Rr]ights?\s+[Rr]eserved\.?.*?\n', r'Confidential\s*[-–]\s*.*?\n', r'仅供内部使用.*?\n', r'内部资料.*?请勿外传.*?\n', ] for pattern in patterns: text = re.sub(pattern, '', text, flags=re.IGNORECASE) return text def remove_table_of_contents_lines(text: str) -> str: """ 去除目录中的点线填充(如"第一章 概述......1")。 这类内容只在目录里有意义,作为知识片段没有价值。 """ # 匹配:文字 + 大量点/空格 + 数字(目录行特征) text = re.sub(r'^.+[.·\s]{5,}\d+\s*$', '', text, flags=re.MULTILINE) return text def normalize_chinese_punctuation(text: str) -> str: """ 统一中文标点符号(可选步骤)。 部分 PDF 提取后会把全角符号变成半角,或者混用。 """ replacements = { ',': ',', # 确保是全角逗号 '。': '。', ';': ';', # 有时 PDF 提取会产生这些异常字符 '\x00': '', # null 字符 '\ufffd': '', # 替换字符(乱码) '\u200b': '', # 零宽空格 } for old, new in replacements.items(): text = text.replace(old, new) return text def remove_lone_special_chars(text: str) -> str: """ 去除单独成行的特殊字符(如单独的 ● ■ ▶ 等)。 """ text = re.sub(r'^[●■▶►▷○◆◇★☆•·]+\s*$', '', text, flags=re.MULTILINE) return text # ── 管道执行器 ──────────────────────────────────────────────── # 预定义清洗管道(可根据文档类型选择) CLEAN_PIPELINE_STANDARD = [ remove_extra_whitespace, remove_page_markers, remove_copyright_notices, remove_table_of_contents_lines, normalize_chinese_punctuation, remove_lone_special_chars, remove_extra_whitespace, # 清洗后再过一遍,去除清洗产生的多余空行 ] CLEAN_PIPELINE_MINIMAL = [ remove_extra_whitespace, normalize_chinese_punctuation, ] def clean_text( text: str, pipeline: list[Callable] = CLEAN_PIPELINE_STANDARD ) -> str: """ 对文本应用清洗管道中的每个清洗函数。 Args: text: 待清洗的原始文本 pipeline: 清洗函数列表,按顺序执行 Returns: 清洗后的文本 """ for clean_fn in pipeline: text = clean_fn(text) return text def clean_documents( docs: list[Document], pipeline: list[Callable] = CLEAN_PIPELINE_STANDARD, min_length: int = 20, ) -> list[Document]: """ 对文档列表中的每个 Document 执行清洗, 并过滤掉清洗后内容过短(可能是空文档)的块。 Args: docs: 待清洗的文档列表 pipeline: 清洗管道 min_length: 清洗后内容最小字符数,低于此值的文档会被过滤 Returns: 清洗后的文档列表 """ cleaned = [] for doc in docs: cleaned_text = clean_text(doc.page_content, pipeline) if len(cleaned_text) >= min_length: doc.page_content = cleaned_text cleaned.append(doc) filtered_count = len(docs) - len(cleaned) if filtered_count > 0: print(f" 过滤掉 {filtered_count} 个过短文档块(< {min_length} 字符)") return cleaned
写完清洗管道,应该立即验证效果。创建 scripts/test_cleaner.py:
scripts/test_cleaner.py
# scripts/test_cleaner.py """ 清洗效果验证脚本 运行:python scripts/test_cleaner.py """ import sys sys.path.append(".") from core.loader import clean_text, CLEAN_PIPELINE_STANDARD # 模拟真实 PDF 提取后的噪声文本 dirty_texts = [ { "name": "带页码的技术文档", "text": """ 3.1 系统架构设计 本 系 统 采 用 微 服 务 架 构 设 计,主要包含以下三个核心模块。 第 12 页 / 共 89 页 © 2023 ACME Corporation. All Rights Reserved. Confidential - Internal Use Only 3.1.1 用户管理模块 用户管理模块负责处理所有与用户身份认证相关的业务逻辑。 详细设计请参考第 4.2 节。 """ }, { "name": "带目录行的文档", "text": """ 目 录 第一章 系统概述........................................1 第二章 技术架构......................................12 第三章 部署方案......................................24 第一章 系统概述 本章介绍系统的整体设计思路和技术选型。 """ }, { "name": "Markdown 文档(正常,最小化清洗)", "text": """ ## 2.1 安装步骤 使用以下命令安装依赖: ```bash pip install -r requirements.txt ``` 安装完成后,运行验证脚本确认环境正常。 """ }, ] for sample in dirty_texts: print(f"\n{'='*60}") print(f"样本:{sample['name']}") print(f"{'='*60}") print(f"【清洗前】({len(sample['text'])} 字符)") print(sample['text'][:300]) cleaned = clean_text(sample['text']) print(f"\n【清洗后】({len(cleaned)} 字符)") print(cleaned[:300])
运行:
python scripts/test_cleaner.py
观察每种噪声是否被正确去除,同时确认正常内容没有被误伤。
文档加载和清洗之后,下一步是切分(Chunking)。这是整个离线链路里工程判断最密集的环节,切分策略的好坏直接影响检索质量。
一个完整的技术手册可能有几万字,不能整体作为一个 Document 存入向量库,原因有三:
但切分也不能无脑切细,太小的切片会导致语义不完整——这是最常见的 RAG 质量问题之一。
最简单的切分策略是按固定字符数切分,这就是第一篇 Demo 里用的方式:
# 第一篇 Demo 的做法(有问题) text_splitter = RecursiveCharacterTextSplitter( chunk_size=300, chunk_overlap=50, )
这种方式的问题是不感知语义边界,容易产生这样的切片:
切片 A(到了 300 字符截止): "...用户管理模块负责处理所有与用户身份认证相关的业务逻辑, 包括登录、注册、权限管理等功能。该模块采用 JWT 认证机制, 支持单点登录(SSO)。数据处理模块" ← 在这里切断了 切片 B: "负责接收和处理来自各业务系统的数据,包括数据清洗、 格式转换和持久化存储。"
用户问"数据处理模块有什么功能",检索到的切片 B 开头是"负责接收和处理……",缺少主语,语义不完整,LLM 可能无法正确解读。
RecursiveCharacterTextSplitter 的名字里有 "Recursive"(递归),这是它的核心机制:
RecursiveCharacterTextSplitter
第一轮:尝试在 "\n\n"(段落)处切分 └─ 如果某段仍然太长,进入第二轮 第二轮:尝试在 "\n"(换行)处切分 └─ 如果某行仍然太长,进入第三轮 第三轮:尝试在 "。""!""?" 处切分 └─ 如果某句仍然太长,进入第四轮 第四轮:按字符强制切分(最后兜底)
通过合理配置分隔符优先级,可以尽量在自然语义边界处切分:
from langchain.text_splitter import RecursiveCharacterTextSplitter splitter = RecursiveCharacterTextSplitter( chunk_size=512, chunk_overlap=64, separators=[ "\n## ", # Markdown 二级标题(最高优先级) "\n### ", # Markdown 三级标题 "\n\n", # 段落分隔 "\n", # 换行 "。", # 中文句号 "!", # 中文感叹号 "?", # 中文问号 ". ", # 英文句号 " ", # 空格 "", # 字符级(最低优先级,兜底) ], length_function=len, # 用字符数计算长度 )
这已经比固定切分好很多了,但还有一个没解决的问题:切片太小,丢失了上下文。
这是本篇最重要的内容,也是「智答」项目相比简单 RAG Demo 最显著的质量提升点。
在切片大小的选择上,存在一个根本性的矛盾:
小切片(< 200 字符) - ✅ 检索精度高:内容集中,向量语义纯净,和用户问题的相关性判断更准确 - ❌ 上下文不完整:LLM 拿到的片段太短,缺乏背景信息,容易误解
大切片(> 1000 字符) - ✅ 上下文完整:LLM 有足够的背景信息理解片段的含义 - ❌ 检索精度低:内容杂,向量被"稀释",可能召回不相关的内容
举个具体例子。假设用户问"JWT 的过期时间怎么设置":
有没有鱼和熊掌兼得的方案?有——父子文档切分。
父子文档切分的思路很简单:
原始文档 │ ▼ ┌──────────────────────────────────────────────┐ │ 父文档(~1500字) │ │ "用户管理模块采用 JWT 认证机制... │ │ 支持单点登录(SSO)... │ │ Token 默认过期时间为 24 小时... │ │ 可通过配置文件调整 TOKEN_EXPIRE 参数..." │ └──────────────────────────────────────────────┘ │ │ │ ▼ ▼ ▼ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ 子文档1(~200字)│ │ 子文档2(~200字)│ │ 子文档3(~200字)│ │"采用JWT认证..." │ │"支持SSO..." │ │"Token默认24h..." │ └──────────────┘ └──────────────┘ └──────────────┘ ↓存向量库 ↓存向量库 ↓存向量库 用户问:"JWT 过期时间怎么设置" → 子文档3 向量相似度最高 → 命中 → 返回子文档3 对应的父文档(1500字,包含完整上下文) → LLM 基于完整父文档生成答案 ✅
LangChain 1.0 提供了开箱即用的 ParentDocumentRetriever,内置了父子文档的存储和检索逻辑。
ParentDocumentRetriever
它需要两个存储: - 向量存储(VectorStore):存子文档的向量,用于相似度搜索 - 文档存储(InMemoryStore / Redis):存父文档的原文,用于检索后的文本返回
# 先了解 ParentDocumentRetriever 的完整用法 from langchain.retrievers import ParentDocumentRetriever from langchain.storage import InMemoryStore from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import Milvus # 父文档切分器(大块) parent_splitter = RecursiveCharacterTextSplitter( chunk_size=1500, chunk_overlap=100, separators=["\n## ", "\n### ", "\n\n", "\n", "。", " ", ""], ) # 子文档切分器(小块,用于向量化) child_splitter = RecursiveCharacterTextSplitter( chunk_size=300, chunk_overlap=30, separators=["\n\n", "\n", "。", "!", "?", " ", ""], ) # 文档存储(存父文档原文) # InMemoryStore 用于开发测试,生产环境建议换成 RedisStore docstore = InMemoryStore() # 向量存储(存子文档向量),这里需要先有 embeddings 对象 # (embeddings 将在第3篇详细介绍,这里先用占位符) # vectorstore = Milvus(embedding_function=embeddings, ...) # 组装 ParentDocumentRetriever retriever = ParentDocumentRetriever( vectorstore=vectorstore, docstore=docstore, child_splitter=child_splitter, parent_splitter=parent_splitter, ) # 添加文档(自动完成父子切分、向量化、存储) retriever.add_documents(docs) # 检索(自动用子文档检索,返回父文档) results = retriever.invoke("JWT 过期时间怎么设置") # results 里的每个 Document 是父文档,内容完整
现在我们把切分逻辑封装成 core/splitter.py,这个模块设计成同时支持"简单切分"和"父子切分"两种模式,方便在开发阶段灵活切换:
# core/splitter.py """ 文档切分模块 提供两种切分策略: 1. SimpleSplitter:普通递归切分,适合快速实验 2. ParentChildSplitter:父子切分,生产推荐方案 使用方式: from core.splitter import ParentChildSplitter splitter = ParentChildSplitter() # add_documents 在第3篇配合 VectorStore 使用 # 本篇提供 split_for_inspection 用于验证切分效果 """ from dataclasses import dataclass, field from langchain_core.documents import Document from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.retrievers import ParentDocumentRetriever from langchain.storage import InMemoryStore # 中英文混合场景的推荐分隔符(按优先级排序) DEFAULT_SEPARATORS = [ "\n## ", # Markdown H2 标题 "\n### ", # Markdown H3 标题 "\n#### ", # Markdown H4 标题 "\n\n", # 空行(段落分隔) "\n", # 换行 "。", # 中文句号 "!", # 中文感叹号 "?", # 中文问号 ";", # 中文分号 ". ", # 英文句号(后接空格) "! ", "? ", "; ", " ", # 空格 "", # 字符级(兜底) ] @dataclass class SplitterConfig: """切分参数配置""" # 父文档参数 parent_chunk_size: int = 1500 parent_chunk_overlap: int = 100 # 子文档参数 child_chunk_size: int = 300 child_chunk_overlap: int = 30 # 分隔符 separators: list = field(default_factory=lambda: DEFAULT_SEPARATORS) class SimpleSplitter: """ 简单递归切分器。 适合:快速实验、文档量小、对检索质量要求不高的场景。 """ def __init__(self, config: SplitterConfig = None): self.config = config or SplitterConfig() self._splitter = RecursiveCharacterTextSplitter( chunk_size=self.config.child_chunk_size, chunk_overlap=self.config.child_chunk_overlap, separators=self.config.separators, ) def split(self, docs: list[Document]) -> list[Document]: """切分文档,返回切片列表""" chunks = self._splitter.split_documents(docs) print(f" SimpleSplitter: {len(docs)} 个文档 → {len(chunks)} 个切片") return chunks class ParentChildSplitter: """ 父子文档切分器。 检索时用子文档做向量匹配(精准),返回父文档(完整上下文)。 这是生产推荐方案。 使用模式: 1. 配合 VectorStore 使用(第3篇集成): splitter.build_retriever(vectorstore) 2. 仅切分查看效果(本篇验证用): parent_chunks, child_chunks = splitter.split_for_inspection(docs) """ def __init__(self, config: SplitterConfig = None): self.config = config or SplitterConfig() self._parent_splitter = RecursiveCharacterTextSplitter( chunk_size=self.config.parent_chunk_size, chunk_overlap=self.config.parent_chunk_overlap, separators=self.config.separators, ) self._child_splitter = RecursiveCharacterTextSplitter( chunk_size=self.config.child_chunk_size, chunk_overlap=self.config.child_chunk_overlap, separators=self.config.separators, ) # 文档存储(第3篇会替换为持久化存储) self.docstore = InMemoryStore() def split_for_inspection( self, docs: list[Document] ) -> tuple[list[Document], list[Document]]: """ 切分文档并返回父子文档对(用于调试和验证)。 Returns: (parent_chunks, child_chunks) 的元组 """ # 先切分成父文档 parent_chunks = self._parent_splitter.split_documents(docs) # 再把每个父文档切分成子文档 # 同时在子文档的 metadata 里记录对应的父文档索引 child_chunks = [] for i, parent in enumerate(parent_chunks): parent_id = f"parent_{i}" parent.metadata["parent_id"] = parent_id # 对父文档进行子切分 children = self._child_splitter.split_documents([parent]) for child in children: child.metadata["parent_id"] = parent_id child_chunks.append(child) print(f" ParentChildSplitter 切分结果:") print(f" 原始文档: {len(docs)} 个") print(f" 父文档块: {len(parent_chunks)} 个" f"(平均 {sum(len(p.page_content) for p in parent_chunks) // max(len(parent_chunks), 1)} 字符)") print(f" 子文档块: {len(child_chunks)} 个" f"(平均 {sum(len(c.page_content) for c in child_chunks) // max(len(child_chunks), 1)} 字符)") print(f" 父子比例: 1:{len(child_chunks) // max(len(parent_chunks), 1)}") return parent_chunks, child_chunks def build_retriever(self, vectorstore) -> ParentDocumentRetriever: """ 构建 ParentDocumentRetriever(第3篇调用)。 Args: vectorstore: 已初始化的 Milvus VectorStore 实例 Returns: 配置好的 ParentDocumentRetriever """ return ParentDocumentRetriever( vectorstore=vectorstore, docstore=self.docstore, child_splitter=self._child_splitter, parent_splitter=self._parent_splitter, )
现在把所有内容整合起来,创建一个可以独立运行的验证脚本,同时完成 core/loader.py 的最终版本。
# core/loader.py(完整版) """ 文档加载与清洗模块 职责: 1. 从磁盘读取各格式文档,统一转换为 Document 对象 2. 对 page_content 进行文本清洗,去除噪声 3. 补充元数据(来源、文件类型、入库时间等) 对外接口: - load_document(file_path) → list[Document] - load_directory(dir_path) → list[Document] - clean_documents(docs) → list[Document] """ import os import re from pathlib import Path from datetime import datetime from typing import Callable from langchain_core.documents import Document from langchain_community.document_loaders import ( TextLoader, PyPDFLoader, UnstructuredMarkdownLoader, ) # ───────────────────────────────────────────────────────── # 元数据处理 # ───────────────────────────────────────────────────────── def enrich_metadata(docs: list[Document], file_path: str) -> list[Document]: file_name = os.path.basename(file_path) file_type = os.path.splitext(file_name)[1].lower().lstrip(".") ingested_at = datetime.now().isoformat() for doc in docs: doc.metadata.update({ "source": file_path, "file_name": file_name, "file_type": file_type, "ingested_at": ingested_at, }) return docs # ───────────────────────────────────────────────────────── # 清洗函数 # ───────────────────────────────────────────────────────── def remove_extra_whitespace(text: str) -> str: text = re.sub(r'[ \t]+$', '', text, flags=re.MULTILINE) text = re.sub(r' {2,}', ' ', text) text = re.sub(r'\n{3,}', '\n\n', text) return text.strip() def remove_page_markers(text: str) -> str: patterns = [ r'[-─━]+\s*第\s*\d+\s*页\s*[-─━]*', r'第\s*\d+\s*页\s*[/共]\s*\d+\s*页', r'Page\s+\d+\s*(?:of\s+\d+)?', r'^\d+\s*$', r'[-─━=]{10,}', ] for pattern in patterns: text = re.sub(pattern, '', text, flags=re.MULTILINE | re.IGNORECASE) return text def remove_copyright_notices(text: str) -> str: patterns = [ r'©.*?版权所有.*?\n', r'Copyright\s*©.*?\n', r'All\s+[Rr]ights?\s+[Rr]eserved\.?.*?\n', r'Confidential\s*[-–]\s*.*?\n', r'仅供内部使用.*?\n', ] for pattern in patterns: text = re.sub(pattern, '', text, flags=re.IGNORECASE) return text def remove_table_of_contents_lines(text: str) -> str: text = re.sub(r'^.+[.·\s]{5,}\d+\s*$', '', text, flags=re.MULTILINE) return text def normalize_chinese_punctuation(text: str) -> str: replacements = {'\x00': '', '\ufffd': '', '\u200b': ''} for old, new in replacements.items(): text = text.replace(old, new) return text def remove_lone_special_chars(text: str) -> str: text = re.sub(r'^[●■▶►▷○◆◇★☆•·]+\s*$', '', text, flags=re.MULTILINE) return text CLEAN_PIPELINE_STANDARD: list[Callable] = [ remove_extra_whitespace, remove_page_markers, remove_copyright_notices, remove_table_of_contents_lines, normalize_chinese_punctuation, remove_lone_special_chars, remove_extra_whitespace, ] CLEAN_PIPELINE_MINIMAL: list[Callable] = [ remove_extra_whitespace, normalize_chinese_punctuation, ] def clean_text( text: str, pipeline: list[Callable] = CLEAN_PIPELINE_STANDARD ) -> str: for fn in pipeline: text = fn(text) return text def clean_documents( docs: list[Document], pipeline: list[Callable] = CLEAN_PIPELINE_STANDARD, min_length: int = 20, ) -> list[Document]: cleaned = [] for doc in docs: cleaned_text = clean_text(doc.page_content, pipeline) if len(cleaned_text) >= min_length: doc.page_content = cleaned_text cleaned.append(doc) filtered = len(docs) - len(cleaned) if filtered > 0: print(f" [Cleaner] 过滤掉 {filtered} 个过短块(< {min_length} 字符)") return cleaned # ───────────────────────────────────────────────────────── # 文档加载 # ───────────────────────────────────────────────────────── SUPPORTED_EXTENSIONS = { ".txt": "text", ".md": "markdown", ".markdown": "markdown", ".pdf": "pdf", } def load_document(file_path: str) -> list[Document]: path = Path(file_path) if not path.exists(): raise FileNotFoundError(f"文件不存在:{file_path}") ext = path.suffix.lower() if ext not in SUPPORTED_EXTENSIONS: raise ValueError(f"不支持的文件格式:{ext}") file_type = SUPPORTED_EXTENSIONS[ext] if file_type == "text": loader = TextLoader(file_path, encoding="utf-8") docs = loader.load() elif file_type == "markdown": try: loader = UnstructuredMarkdownLoader(file_path, mode="elements") docs = loader.load() except Exception: loader = TextLoader(file_path, encoding="utf-8") docs = loader.load() elif file_type == "pdf": loader = PyPDFLoader(file_path) docs = loader.load() docs = enrich_metadata(docs, file_path) return docs def load_directory(dir_path: str) -> list[Document]: all_docs = [] dir_path = Path(dir_path) for ext in SUPPORTED_EXTENSIONS.keys(): for file_path in dir_path.rglob(f"*{ext}"): try: docs = load_document(str(file_path)) all_docs.extend(docs) print(f" ✅ {file_path.name} ({len(docs)} 块)") except Exception as e: print(f" ❌ {file_path.name}: {e}") return all_docs
准备测试文档,在 data/raw/ 里创建 sample_tech_doc.md:
data/raw/
sample_tech_doc.md
# Python 异步编程指南 ## 1. 为什么需要异步编程 在传统的同步编程模型中,程序按顺序执行,每一步必须等待上一步完成才能继续。 当遇到 IO 操作(如网络请求、文件读写)时,CPU 会处于空闲等待状态,造成资源浪费。 异步编程的核心思想是:在等待 IO 的过程中,让 CPU 去处理其他任务,从而提高整体吞吐量。 Python 3.4 引入了 asyncio 标准库,Python 3.5 引入了 async/await 语法糖,使异步编程变得更加直观。 ## 2. asyncio 基础概念 ### 2.1 事件循环(Event Loop) 事件循环是 asyncio 的核心调度器。它维护一个任务队列,不断从队列中取出任务执行。 当某个任务遇到 IO 等待时,事件循环会暂停该任务,切换到其他可运行的任务,等 IO 完成后再恢复。 启动事件循环的标准方式: ```python import asyncio async def main(): print("Hello, asyncio!") asyncio.run(main())
协程是用 async def 定义的函数,调用协程函数返回一个协程对象,并不会立即执行。 协程必须通过 await 或者在事件循环中调度才会实际运行。
async def fetch_data(url: str) -> str: # 模拟异步 IO 操作 await asyncio.sleep(1) return f"Data from {url}"
gather 是最常用的并发原语,可以同时启动多个协程,等待所有协程完成。
async def main(): results = await asyncio.gather( fetch_data("https://api1.example.com"), fetch_data("https://api2.example.com"), fetch_data("https://api3.example.com"), ) # results 是一个列表,包含每个协程的返回值 print(results)
相比串行执行(3秒),gather 并发执行只需要约 1 秒,效率提升 3 倍。
create_task 可以在协程运行过程中动态创建新任务,更灵活:
async def main(): task1 = asyncio.create_task(fetch_data("url1")) task2 = asyncio.create_task(fetch_data("url2")) # 此时两个任务已经开始运行(不需要 await) result1 = await task1 result2 = await task2
使用 async with 语法,适合需要异步初始化和清理的资源(如数据库连接、HTTP Session):
async with aiohttp.ClientSession() as session: async with session.get(url) as response: data = await response.json()
使用 async for 语法,适合处理流式数据:
async for chunk in response.content: process(chunk)
在使用 asyncio 时,有几个常见错误需要注意。
第一,不要在协程里使用阻塞的同步 IO 函数(如 requests.get、time.sleep), 这会阻塞整个事件循环。应该使用对应的异步版本(aiohttp、asyncio.sleep)。
第二,CPU 密集型任务不适合用协程,应该用 asyncio.run_in_executor 配合线程池或进程池。
第三,注意协程的取消(cancellation),在适当的地方处理 asyncio.CancelledError。
现在创建完整的端到端验证脚本: ```python # scripts/test_pipeline.py """ 端到端管道验证脚本 测试:文档加载 → 清洗 → 父子切分 的完整流程 运行:python scripts/test_pipeline.py """ import sys sys.path.append(".") from core.loader import load_document, load_directory, clean_documents from core.splitter import ParentChildSplitter, SimpleSplitter, SplitterConfig def print_separator(title: str): print(f"\n{'='*60}") print(f" {title}") print(f"{'='*60}") def show_chunk_preview(chunks, label: str, max_show: int = 2): """展示切片内容预览""" print(f"\n [{label}] 共 {len(chunks)} 个切片,预览前 {max_show} 个:") for i, chunk in enumerate(chunks[:max_show]): content_preview = chunk.page_content[:120].replace('\n', ' ') print(f"\n ── 切片 {i+1} ──") print(f" 字符数: {len(chunk.page_content)}") print(f" 来源: {chunk.metadata.get('file_name', 'unknown')}") if "parent_id" in chunk.metadata: print(f" 父文档ID: {chunk.metadata['parent_id']}") print(f" 内容预览: {content_preview}...") # ════════════════════════════════════════════════════════ # 阶段一:文档加载 # ════════════════════════════════════════════════════════ print_separator("阶段一:文档加载") docs = load_document("data/raw/sample_tech_doc.md") print(f"\n 加载结果:{len(docs)} 个 Document 对象") print(f" 总字符数:{sum(len(d.page_content) for d in docs)}") print(f"\n 元数据样例:") for key, val in docs[0].metadata.items(): print(f" {key}: {val}") # ════════════════════════════════════════════════════════ # 阶段二:文本清洗 # ════════════════════════════════════════════════════════ print_separator("阶段二:文本清洗") # 人为给文档注入一些噪声来验证清洗效果 test_doc_content = docs[0].page_content noise_injected = test_doc_content + """ \n\n 第 1 页 / 共 8 页 © 2024 TechBot. All Rights Reserved. 仅供内部使用,请勿外传 """ from langchain_core.documents import Document from core.loader import clean_text print(f"\n 注入噪声后字符数:{len(noise_injected)}") cleaned = clean_text(noise_injected) print(f" 清洗后字符数:{len(cleaned)}") # 验证页码被去除 assert "第 1 页" not in cleaned, "页码应该被去除" assert "All Rights Reserved" not in cleaned, "版权声明应该被去除" assert "仅供内部使用" not in cleaned, "内部声明应该被去除" print(" ✅ 清洗验证通过:页码、版权声明均已去除") # 对加载的文档执行清洗 cleaned_docs = clean_documents(docs) print(f" 清洗后文档数量:{len(cleaned_docs)}") # ════════════════════════════════════════════════════════ # 阶段三:切分策略对比 # ════════════════════════════════════════════════════════ print_separator("阶段三A:简单切分") simple_splitter = SimpleSplitter() simple_chunks = simple_splitter.split(cleaned_docs) show_chunk_preview(simple_chunks, "SimpleSplitter") print_separator("阶段三B:父子文档切分(推荐)") config = SplitterConfig( parent_chunk_size=800, # 父文档:约 2-4 段 parent_chunk_overlap=80, child_chunk_size=200, # 子文档:约 1-2 句 child_chunk_overlap=20, ) pc_splitter = ParentChildSplitter(config=config) parent_chunks, child_chunks = pc_splitter.split_for_inspection(cleaned_docs) show_chunk_preview(parent_chunks, "父文档", max_show=1) show_chunk_preview(child_chunks, "子文档", max_show=3) # ════════════════════════════════════════════════════════ # 阶段四:父子关系验证 # ════════════════════════════════════════════════════════ print_separator("阶段四:验证父子关系") # 取第一个子文档,找到它对应的父文档 sample_child = child_chunks[2] parent_id = sample_child.metadata.get("parent_id") # 找到同一个 parent_id 的父文档 matching_parent = next( (p for p in parent_chunks if p.metadata.get("parent_id") == parent_id), None ) print(f"\n 子文档内容({len(sample_child.page_content)} 字符):") print(f" 「{sample_child.page_content[:100]}...」") if matching_parent: print(f"\n 对应父文档内容({len(matching_parent.page_content)} 字符):") print(f" 「{matching_parent.page_content[:200]}...」") # 验证子文档内容确实包含在父文档中 is_contained = sample_child.page_content[:50] in matching_parent.page_content print(f"\n ✅ 父子关系验证:子文档内容{'包含在' if is_contained else '不在'}父文档中") else: print(f" ⚠️ 未找到对应的父文档(parent_id: {parent_id})") print_separator("测试完成") print("\n ✅ 文档加载 → 清洗 → 父子切分 管道验证通过") print(" 下一步:第3篇将接入 BGE-M3 向量化 + Milvus 存储")
运行测试:
python scripts/test_pipeline.py
期望看到类似这样的输出:
============================================================ 阶段一:文档加载 ============================================================ 加载结果:1 个 Document 对象 总字符数:2847 元数据样例: source: data/raw/sample_tech_doc.md file_name: sample_tech_doc.md file_type: md ingested_at: 2024-01-15T10:23:45.123456 ============================================================ 阶段二:文本清洗 ============================================================ 注入噪声后字符数:2981 清洗后字符数:2849 ✅ 清洗验证通过:页码、版权声明均已去除 ============================================================ 阶段三A:简单切分 ============================================================ SimpleSplitter: 1 个文档 → 14 个切片 [SimpleSplitter] 共 14 个切片,预览前 2 个: ── 切片 1 ── 字符数: 198 来源: sample_tech_doc.md 内容预览: # Python 异步编程指南 ## 1. 为什么需要异步编程 在传统的同步编程模型中... ============================================================ 阶段三B:父子文档切分(推荐) ============================================================ ParentChildSplitter 切分结果: 原始文档: 1 个 父文档块: 5 个(平均 541 字符) 子文档块: 16 个(平均 156 字符) 父子比例: 1:3
切片大小没有通用最优解,需要根据文档特点调整。以下是经验性规律:
调参方法:跑完切分后,用 split_for_inspection 查看父子比例。父子比例在 1:3 到 1:5 之间通常是比较合理的。
split_for_inspection
PDF 质量差是 RAG 项目中最常见的困扰。排查步骤:
# 快速检查 PDF 提取质量 from langchain_community.document_loaders import PyPDFLoader loader = PyPDFLoader("your_file.pdf") pages = loader.load() for i, page in enumerate(pages[:3]): print(f"\n--- 第 {i+1} 页 ---") print(repr(page.page_content[:200])) # 用 repr 显示隐藏字符
如果看到大量 \x00、\ufffd、字符间有空格等问题,说明 PDF 是扫描版或加密版。解决方案: - 扫描版 PDF:使用 OCR(推荐 pytesseract 或 paddleocr) - 加密版 PDF:先用 pypdf 解密,再提取
\x00
\ufffd
pytesseract
paddleocr
pypdf
UnstructuredMarkdownLoader 在 mode="elements" 模式下会把每个元素(标题、段落、列表项)各自返回为一个 Document,可能产生几十个碎片。
UnstructuredMarkdownLoader
mode="elements"
解决方案一:改用 mode="single"(整个文件作为一个 Document,切分交给后面的 Splitter)
mode="single"
loader = UnstructuredMarkdownLoader(file_path, mode="single")
解决方案二:合并过短的 Document:
def merge_short_docs( docs: list[Document], min_length: int = 100 ) -> list[Document]: """把过短的相邻文档合并,直到达到 min_length""" merged = [] buffer = "" buffer_metadata = {} for doc in docs: if len(buffer) + len(doc.page_content) < min_length: buffer += "\n" + doc.page_content buffer_metadata = doc.metadata # 保留最后一个的 metadata else: if buffer: merged.append(Document( page_content=buffer.strip(), metadata=buffer_metadata, )) buffer = doc.page_content buffer_metadata = doc.metadata if buffer: merged.append(Document(page_content=buffer.strip(), metadata=buffer_metadata)) return merged
某些文档里包含控制字符或特殊 Unicode 符号,会在后续向量化时引发报错。可以在清洗管道末尾加一个兜底清洗:
def remove_control_characters(text: str) -> str: """去除 ASCII 控制字符(0x00-0x1F 和 0x7F,保留换行和制表符)""" return re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', text)
本篇我们完成了离线链路的前两个阶段,把代码组织成了可复用的模块:
core/loader.py:处理文档加载和清洗 - 支持 TXT / Markdown / PDF 三种格式,自动路由 - 提供标准清洗管道,覆盖页码、版权声明、多余空白等常见噪声 - 统一元数据结构,为后续过滤检索打好基础
core/splitter.py:处理文档切分 - SimpleSplitter:适合快速实验 - ParentChildSplitter:生产推荐方案,解决检索精度和上下文完整性的矛盾
SimpleSplitter
ParentChildSplitter
一个关键认知值得再强调一遍:文档处理质量是 RAG 系统效果的真实上限。精心设计的清洗管道和切分策略,往往比换一个更大的模型更能提升实际问答效果。
下一篇预告
离线链路还差最后一步:把切分好的文档变成向量,存入 Milvus。
第三篇我们将深入讲解: - BGE-M3 为什么能同时生成稠密和稀疏两种向量,它们各自代表什么 - Milvus 的核心概念(Collection、Index、Schema)和 Milvus Lite 的使用 - 如何把 ParentChildSplitter 和 Milvus 真正连接起来,完成离线入库全链路 - 混合检索的底层原理和 LangChain 的集成方式
Milvus
本篇新增代码文件: - core/loader.py:文档加载与清洗模块(完整版) - core/splitter.py:文档切分模块(SimpleSplitter + ParentChildSplitter) - scripts/test_cleaner.py:清洗效果验证 - scripts/test_pipeline.py:端到端管道验证 - data/raw/sample_tech_doc.md:测试用技术文档
scripts/test_pipeline.py
data/raw/sample_tech_doc.md
上一篇:《RAG 系统全景导读:从 FAQ 到智能问答的工程跨越》 下一篇:《向量化与存储:用 BGE-M3 和 Milvus 构建混合检索库》
还没有评论,来抢沙发吧!
这个人很懒,什么都没写...
3 篇文章
还没有评论,来抢沙发吧!