系列导航:[第一篇] → [第二篇] → [第三篇] → 第四篇(当前) → 第五篇:OutputParser 详解 前置要求:已掌握前三篇内容,熟悉 ChatPromptTemplate 和 ChatOpenAI 的基本用法 本篇目标:深透理解 LCEL(LangChain Expression Language)的设计哲学和底层机制,掌握 Runnable 接口体系、所有核心组合原语的使用方式,能够构建从简单到复杂的各类处理链,并学会调试、测试和错误处理。
系列导航:[第一篇] → [第二篇] → [第三篇] → 第四篇(当前) → 第五篇:OutputParser 详解
前置要求:已掌握前三篇内容,熟悉 ChatPromptTemplate 和 ChatOpenAI 的基本用法
本篇目标:深透理解 LCEL(LangChain Expression Language)的设计哲学和底层机制,掌握 Runnable 接口体系、所有核心组合原语的使用方式,能够构建从简单到复杂的各类处理链,并学会调试、测试和错误处理。
在第一篇里,我们看到了 LCEL 的基本形态:
chain = prompt | llm | parser result = chain.invoke({"input": "你好"})
你可能会想:这不就是把函数调用串起来吗?我自己写个 pipeline 函数不也一样?
pipeline
# 自己实现的"链" def my_chain(input_dict): prompt_value = prompt.invoke(input_dict) ai_message = llm.invoke(prompt_value) result = parser.invoke(ai_message) return result
这段代码在功能上确实等价,但一旦需求稍微复杂一点,差距就出来了:
需求一:流式输出
# 自己实现:需要特殊处理 def my_chain_stream(input_dict): prompt_value = prompt.invoke(input_dict) for chunk in llm.stream(prompt_value): # 只有 llm 支持流式 yield parser.invoke(chunk) # parser 要单独适配 # LCEL:自动传递,无需任何改动 for chunk in chain.stream({"input": "你好"}): # 流式自动穿透整条链 print(chunk, end="")
需求二:异步支持
# 自己实现:要重写一遍异步版本 async def my_chain_async(input_dict): prompt_value = await prompt.ainvoke(input_dict) ai_message = await llm.ainvoke(prompt_value) return await parser.ainvoke(ai_message) # LCEL:自动支持,chain 同时具备同步和异步能力 result = await chain.ainvoke({"input": "你好"})
需求三:批量并发
# 自己实现:需要手写并发逻辑 import asyncio async def my_chain_batch(inputs): tasks = [my_chain_async(inp) for inp in inputs] return await asyncio.gather(*tasks) # LCEL:一行搞定,内置并发控制 results = chain.batch(inputs, config={"max_concurrency": 5})
需求四:中间步骤追踪
# 自己实现:要手动埋点 def my_chain_with_trace(input_dict): prompt_value = prompt.invoke(input_dict) print(f"[Prompt] {prompt_value}") # 手动打印 ai_message = llm.invoke(prompt_value) print(f"[LLM] {ai_message}") # 手动打印 return parser.invoke(ai_message) # LCEL:对接 LangSmith,自动追踪每一步的输入输出,无需任何代码改动
这就是 LCEL 的真正价值——它不是语法糖,而是一套统一的组件协议,让所有符合这个协议的组件天然获得流式、异步、批量、追踪等能力,而你不需要在每个组件里重复实现这些横切关注点。
LCEL 能够工作的根本原因,是所有 LangChain 组件都实现了同一个基础接口:Runnable。
Runnable
# langchain_core/runnables/base.py(简化版) from abc import ABC, abstractmethod from typing import Any, Iterator, AsyncIterator class Runnable(ABC): """所有 LangChain 组件的基类""" @abstractmethod def invoke(self, input: Any, config=None) -> Any: """同步调用,返回单个结果""" ... def stream(self, input: Any, config=None) -> Iterator: """流式调用,yield 多个结果块""" # 默认实现:把 invoke 的结果包装成单元素迭代器 yield self.invoke(input, config) def batch(self, inputs: list, config=None) -> list: """批量调用,返回结果列表""" return [self.invoke(inp, config) for inp in inputs] async def ainvoke(self, input: Any, config=None) -> Any: """异步版 invoke""" ... async def astream(self, input: Any, config=None) -> AsyncIterator: """异步版 stream""" ... # 关键:| 运算符的实现 def __or__(self, other: "Runnable") -> "RunnableSequence": return RunnableSequence(first=self, last=other)
正是 __or__ 方法的实现,让 prompt | llm | parser 这个语法成为可能。每次 | 运算都返回一个 RunnableSequence 对象,而 RunnableSequence 本身也是一个 Runnable,所以可以无限串联。
__or__
prompt | llm | parser
|
RunnableSequence
所有 LangChain 的核心组件都实现了 Runnable 接口:
ChatPromptTemplate
dict
ChatPromptValue
ChatOpenAI
str
AIMessage
StrOutputParser
JsonOutputParser
RunnableLambda(fn)
RunnablePassthrough()
RunnableParallel({...})
这意味着,只要你把自定义函数包装成 RunnableLambda,它就能无缝插入任何 LCEL 链中,自动获得流式、异步、批量等能力。
RunnableLambda
from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnableSerializable llm = ChatOpenAI(model="gpt-4o") prompt = ChatPromptTemplate.from_messages([("human", "{input}")]) parser = StrOutputParser() # 验证都是 Runnable 的子类 print(isinstance(llm, RunnableSerializable)) # True print(isinstance(prompt, RunnableSerializable)) # True print(isinstance(parser, RunnableSerializable)) # True # 验证 chain 也是 Runnable chain = prompt | llm | parser print(isinstance(chain, RunnableSerializable)) # True # 查看链的结构 print(chain) # first=ChatPromptTemplate(...) # middle=[ChatOpenAI(...)] # last=StrOutputParser()
理解了 Runnable 接口,再来深入看四种调用方式。
from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser chain = ChatPromptTemplate.from_messages([("human", "{q}")]) | ChatOpenAI(model="gpt-4o") | StrOutputParser() # 基本调用 result = chain.invoke({"q": "什么是 Python?"}) print(type(result)) # <class 'str'> # 带配置的调用(覆盖模型参数、添加回调等) from langchain_core.runnables import RunnableConfig result = chain.invoke( {"q": "什么是 Python?"}, config=RunnableConfig( tags=["production"], # 标签,用于 LangSmith 筛选 metadata={"user_id": "u123"}, # 元数据 max_concurrency=1, # 并发限制 run_name="qa_chain", # 本次运行的名称 ) )
LCEL 的流式传播是其最精妙的设计之一。当调用 chain.stream() 时:
chain.stream()
prompt.invoke()
llm.stream()
parser
chain = ChatPromptTemplate.from_messages([("human", "{q}")]) | ChatOpenAI(model="gpt-4o") | StrOutputParser() # 流式输出 print("开始输出:", end="") for chunk in chain.stream({"q": "用 100 字介绍 LangChain"}): print(chunk, end="", flush=True) print("\n结束") # 如果你需要在流式的同时收集完整结果 full_output = "" for chunk in chain.stream({"q": "介绍 RAG 技术"}): print(chunk, end="", flush=True) full_output += chunk print(f"\n\n完整输出长度:{len(full_output)} 字符")
一个常见误区:JsonOutputParser 在流式模式下会尝试增量解析 JSON,输出的是部分解析的字典而不是字符串 chunk,这在某些场景很有用:
from langchain_core.output_parsers import JsonOutputParser chain = ( ChatPromptTemplate.from_messages([ ("system", "只输出 JSON,不要 Markdown"), ("human", "列出 3 种编程语言及其特点,格式:{{\"languages\": [...]}}"), ]) | ChatOpenAI(model="gpt-4o") | JsonOutputParser() ) # 流式输出时,JsonOutputParser 会增量输出部分解析的字典 for chunk in chain.stream({}): print(chunk) # 每次输出当前已能解析的部分字典 # {} # {'languages': []} # {'languages': [{'name': 'Python'}]} # {'languages': [{'name': 'Python', 'feature': '简洁易读'}]} # ... 逐步补全
batch 的内部实现是用 ThreadPoolExecutor 并发执行多个 invoke:
batch
ThreadPoolExecutor
invoke
chain = ChatPromptTemplate.from_messages([("human", "{q}")]) | ChatOpenAI(model="gpt-4o") | StrOutputParser() questions = [ {"q": "什么是 Python?"}, {"q": "什么是 JavaScript?"}, {"q": "什么是 Rust?"}, {"q": "什么是 Go?"}, ] import time # 串行执行(慢) t0 = time.time() results_serial = [chain.invoke(q) for q in questions] print(f"串行耗时:{time.time()-t0:.1f}s") # batch 并发执行(快) t0 = time.time() results_batch = chain.batch(questions, config={"max_concurrency": 4}) print(f"batch 并发耗时:{time.time()-t0:.1f}s") # 约为串行的 1/4 # batch 允许部分失败,return_exceptions=True 时失败的请求返回异常对象而非抛出 results = chain.batch( questions, config={"max_concurrency": 3}, return_exceptions=True, # 失败不中断整个 batch ) for i, r in enumerate(results): if isinstance(r, Exception): print(f"请求 {i} 失败:{r}") else: print(f"请求 {i} 成功:{r[:30]}...")
import asyncio from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser chain = ChatPromptTemplate.from_messages([("human", "{q}")]) | ChatOpenAI(model="gpt-4o") | StrOutputParser() async def demo(): # 异步单次调用 result = await chain.ainvoke({"q": "什么是协程?"}) print(result) # 异步流式 async for chunk in chain.astream({"q": "介绍 asyncio"}): print(chunk, end="", flush=True) print() # 真正的并发:多个异步任务同时执行 tasks = [ chain.ainvoke({"q": "介绍 Python"}), chain.ainvoke({"q": "介绍 JavaScript"}), chain.ainvoke({"q": "介绍 Rust"}), ] results = await asyncio.gather(*tasks) for r in results: print(r[:50]) asyncio.run(demo())
LCEL 提供了一组"组合原语"(Combinators),用于处理各种复杂的数据流场景。
RunnablePassthrough 是最简单的原语,它原封不动地把输入传递给输出。单独使用没什么意义,但在链中配合 assign 方法,可以在不改变原始输入的情况下附加新字段,这在 RAG 链中极其常见:
RunnablePassthrough
assign
from langchain_core.runnables import RunnablePassthrough from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser # 基本用法:原样透传 passthrough = RunnablePassthrough() result = passthrough.invoke({"key": "value"}) print(result) # {'key': 'value'} # 核心用法:assign 附加新字段 # 场景:在 RAG 中,既要保留原始 question,又要附加检索到的 context from langchain_core.runnables import RunnablePassthrough # 模拟检索器(实际会是 VectorStore.as_retriever()) def fake_retriever(question: str) -> str: return f"关于'{question}'的相关文档内容..." chain = ( RunnablePassthrough.assign( # assign 会把函数的返回值作为新字段附加到 dict 中 context=lambda x: fake_retriever(x["question"]) ) | ChatPromptTemplate.from_messages([ ("system", "根据以下上下文回答问题:\n\n{context}"), ("human", "{question}"), ]) | ChatOpenAI(model="gpt-4o") | StrOutputParser() ) result = chain.invoke({"question": "什么是向量数据库?"}) print(result) # 链中的数据流: # {"question": "什么是向量数据库?"} # ↓ RunnablePassthrough.assign(context=...) # {"question": "什么是向量数据库?", "context": "关于'什么是向量数据库?'的相关文档内容..."} # ↓ ChatPromptTemplate # 格式化后的消息(包含 context 和 question) # ↓ ChatOpenAI # AIMessage # ↓ StrOutputParser # str
这是 LCEL 最灵活的扩展点。任何 Python 函数都可以通过 RunnableLambda 包装后插入链中:
from langchain_core.runnables import RunnableLambda from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser # 普通函数包装 def preprocess(input_dict: dict) -> dict: """预处理:清理输入文本""" return { **input_dict, "question": input_dict["question"].strip().rstrip("??") + "?", } def postprocess(text: str) -> dict: """后处理:包装成标准响应格式""" return { "answer": text, "word_count": len(text), "status": "success", } chain = ( RunnableLambda(preprocess) | ChatPromptTemplate.from_messages([("human", "{question}")]) | ChatOpenAI(model="gpt-4o") | StrOutputParser() | RunnableLambda(postprocess) ) result = chain.invoke({"question": " 什么是 LangChain "}) print(result) # {'answer': '...LangChain 是...', 'word_count': 200, 'status': 'success'}
装饰器写法(更简洁):
from langchain_core.runnables import chain as runnable_chain # 用 @chain 装饰器,直接把函数变成 Runnable @runnable_chain def my_custom_chain(input_dict: dict) -> str: """自定义链逻辑""" # 可以在这里组合多个 Runnable 调用 llm = ChatOpenAI(model="gpt-4o") question = input_dict["question"] # 生成搜索关键词 keywords = ( ChatPromptTemplate.from_messages([ ("human", "提取关键词(只输出关键词,逗号分隔):{q}") ]) | llm | StrOutputParser() ).invoke({"q": question}) # 用关键词搜索后生成回答(这里简化模拟) return f"基于关键词 [{keywords}] 的回答:..." result = my_custom_chain.invoke({"question": "如何优化 Python 性能?"})
异步 Lambda:
import asyncio from langchain_core.runnables import RunnableLambda async def async_fetch(input_dict: dict) -> dict: """异步获取外部数据""" await asyncio.sleep(0.1) # 模拟 IO 等待 return {**input_dict, "fetched_data": "从外部 API 获取的数据"} # RunnableLambda 自动检测函数是否是协程 async_step = RunnableLambda(async_fetch) result = asyncio.run(async_step.ainvoke({"question": "你好"}))
RunnableParallel 接收同一个输入,同时发给多个子链,最后把所有结果合并成一个字典:
RunnableParallel
from langchain_core.runnables import RunnableParallel, RunnablePassthrough from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser llm = ChatOpenAI(model="gpt-4o") # 场景:对同一段文本同时做摘要、情感分析、关键词提取 summary_chain = ( ChatPromptTemplate.from_messages([("human", "用 50 字总结:\n{text}")]) | llm | StrOutputParser() ) sentiment_chain = ( ChatPromptTemplate.from_messages([("human", "判断情感(正面/负面/中性):\n{text}")]) | llm | StrOutputParser() ) keywords_chain = ( ChatPromptTemplate.from_messages([("human", "提取关键词(最多 5 个):\n{text}")]) | llm | StrOutputParser() ) # 三个链并行执行,输入都是 {"text": ...} parallel_chain = RunnableParallel( summary=summary_chain, sentiment=sentiment_chain, keywords=keywords_chain, original=RunnablePassthrough(), # 原样保留输入 ) result = parallel_chain.invoke({ "text": "LangChain 是一个强大的 LLM 应用开发框架,极大地简化了 AI 应用的构建流程。" }) print("摘要:", result["summary"]) print("情感:", result["sentiment"]) print("关键词:", result["keywords"]) print("原始:", result["original"]) # result 是一个字典,包含所有分支的结果
简写方式:直接传字典给 | 运算符,会自动转成 RunnableParallel:
# 等价写法,更简洁 parallel_chain = { "summary": summary_chain, "sentiment": sentiment_chain, "keywords": keywords_chain, } # 字典会被自动识别为 RunnableParallel,可以直接用 | 串联 full_chain = parallel_chain | (lambda x: f"摘要:{x['summary']}\n情感:{x['sentiment']}")
根据条件选择不同的执行路径:
from langchain_core.runnables import RunnableBranch, RunnableLambda from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser llm = ChatOpenAI(model="gpt-4o") # 定义不同类型问题的处理链 technical_chain = ( ChatPromptTemplate.from_messages([ ("system", "你是技术专家,给出详细的技术说明和代码示例。"), ("human", "{question}"), ]) | llm | StrOutputParser() ) casual_chain = ( ChatPromptTemplate.from_messages([ ("system", "你是友好的助手,用轻松的语气回答。"), ("human", "{question}"), ]) | llm | StrOutputParser() ) default_chain = ( ChatPromptTemplate.from_messages([("human", "{question}")]) | llm | StrOutputParser() ) # 构建条件分支 branch = RunnableBranch( # (条件函数, 对应的链) (lambda x: any(kw in x["question"] for kw in ["代码", "算法", "实现", "API"]), technical_chain), (lambda x: any(kw in x["question"] for kw in ["聊天", "推荐", "好玩", "有趣"]), casual_chain), # 默认分支(没有条件) default_chain, ) # 测试 r1 = branch.invoke({"question": "如何实现一个 LRU 缓存?"}) # → technical_chain r2 = branch.invoke({"question": "你有什么好玩的游戏推荐吗?"}) # → casual_chain r3 = branch.invoke({"question": "今天天气怎么样?"}) # → default_chain
operator.itemgetter 和 RunnableLambda 配合,可以灵活地从链的输出中提取特定字段:
operator.itemgetter
from operator import itemgetter from langchain_core.runnables import RunnablePassthrough # 场景:链返回的是字典,但下一步只需要其中某个字段 chain_returns_dict = RunnableParallel( answer=answer_chain, sources=source_chain, ) # 只取 answer 字段传给下一步 final_chain = chain_returns_dict | itemgetter("answer") | some_further_processing
写 LCEL 链时,最容易出错的地方就是类型不匹配。理解每个组件的输入输出类型,能帮你快速定位问题。
from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI from langchain_core.output_parsers import StrOutputParser, JsonOutputParser from langchain_core.messages import AIMessage from langchain_core.prompt_values import ChatPromptValue # 显式展示每一步的类型 prompt = ChatPromptTemplate.from_messages([("human", "{q}")]) llm = ChatOpenAI(model="gpt-4o") parser = StrOutputParser() # 手动逐步执行,观察类型变化 step1: ChatPromptValue = prompt.invoke({"q": "你好"}) print(f"Step1 类型:{type(step1)}") # ChatPromptValue print(f"Step1 内容:{step1.to_messages()}") # [HumanMessage(content='你好')] step2: AIMessage = llm.invoke(step1) print(f"\nStep2 类型:{type(step2)}") # AIMessage print(f"Step2 内容:{step2.content[:50]}") # 你好!我是... step3: str = parser.invoke(step2) print(f"\nStep3 类型:{type(step3)}") # str print(f"Step3 内容:{step3[:50]}") # 你好!我是...
错误一:直接把字符串传给 prompt
chain = prompt | llm | parser # ❌ 报错:prompt 期望 dict,收到 str chain.invoke("你好") # ✅ 正确 chain.invoke({"q": "你好"})
错误二:prompt 变量名与传入 dict 的键名不匹配
prompt = ChatPromptTemplate.from_messages([("human", "{question}")]) chain = prompt | llm | parser # ❌ 报错:缺少变量 'question' chain.invoke({"q": "你好"}) # ✅ 正确:键名要和模板里的变量名完全一致 chain.invoke({"question": "你好"})
错误三:链的中间步骤改变了 dict 结构,导致后续步骤找不到变量
# ❌ 问题场景 chain = ( RunnablePassthrough.assign(context=get_context) # 增加了 context 字段 | prompt # prompt 里有 {question} 和 {context},看起来没问题 | llm | StrOutputParser() | RunnableLambda(lambda x: {"answer": x}) # 返回新 dict | prompt # ❌ 报错!此时输入是 {"answer": ...},没有 {question} 和 {context} 了 )
解决原则:在 LCEL 链中,每一步的输出都要和下一步的输入类型、字段匹配。当数据结构发生变化时(比如 StrOutputParser 把 dict→str),后续步骤要相应调整。
import langchain langchain.verbose = True # 全局开启 # 或者针对单条链开启 from langchain_core.callbacks import StdOutCallbackHandler result = chain.invoke( {"q": "你好"}, config={"callbacks": [StdOutCallbackHandler()]} )
# 大链调试的最佳实践:把链拆成多段,分别测试 prompt = ChatPromptTemplate.from_messages([("human", "{q}")]) llm = ChatOpenAI(model="gpt-4o") parser = StrOutputParser() # 第一段 chain_part1 = prompt output1 = chain_part1.invoke({"q": "你好"}) print("Part1 输出:", output1) assert output1 is not None, "Part1 失败" # 第二段 chain_part2 = prompt | llm output2 = chain_part2.invoke({"q": "你好"}) print("Part2 输出:", output2) assert isinstance(output2, AIMessage), "Part2 失败" # 完整链 chain_full = prompt | llm | parser output3 = chain_full.invoke({"q": "你好"}) print("Full 输出:", output3)
chain = prompt | llm | parser # 打印链的结构(JSON 格式) import json print(json.dumps(chain.get_graph().to_json(), indent=2, ensure_ascii=False)) # 可视化(需要安装 grandalf) chain.get_graph().print_ascii() # +-----------------------------+ # | ChatPromptTemplate | # +-----------------------------+ # * # * # +-----------------------------+ # | ChatOpenAI | # +-----------------------------+ # * # * # +-----------------------------+ # | StrOutputParser | # +-----------------------------+
# 为整条链设置运行名称,方便在 LangSmith 里筛选 chain_with_name = chain.with_config({"run_name": "production_qa_chain"}) # 为链设置默认配置 chain_with_defaults = chain.with_config( configurable={}, tags=["v2.0"], metadata={"env": "prod"}, )
LCEL 提供了一种优雅的机制,让你在不重建链的情况下,在调用时动态修改链的部分参数。这在需要根据用户请求调整模型行为的生产场景中非常实用。
假设你构建了一个写作助手,不同用户希望使用不同的创作风格(temperature)和输出长度(max_tokens):
from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import ConfigurableField llm = ChatOpenAI(model="gpt-4o", temperature=0.7).configurable_fields( # 声明 temperature 字段为"可在运行时配置" temperature=ConfigurableField( id="llm_temperature", # 运行时引用这个字段用的 ID name="模型创意程度", # 可读名称(用于文档) description="0=严谨确定,1=标准,2=最大创意", ), max_tokens=ConfigurableField( id="llm_max_tokens", name="最大输出长度", ), ) prompt = ChatPromptTemplate.from_messages([ ("system", "你是一位专业的写作助手。"), ("human", "请写一篇关于 {topic} 的文章。"), ]) chain = prompt | llm | StrOutputParser() # 普通调用(使用默认参数:temperature=0.7) result1 = chain.invoke({"topic": "秋天"}) # 运行时修改参数:高创意模式 result2 = chain.invoke( {"topic": "秋天"}, config={"configurable": {"llm_temperature": 1.5, "llm_max_tokens": 500}}, ) # 运行时修改参数:严谨模式(适合技术文档) result3 = chain.invoke( {"topic": "TCP/IP 协议"}, config={"configurable": {"llm_temperature": 0.1, "llm_max_tokens": 2000}}, )
更强大的场景:根据运行时参数切换整个组件(比如切换底层模型),而不只是修改参数:
import os from langchain_openai import ChatOpenAI from langchain_anthropic import ChatAnthropic from langchain_core.runnables import ConfigurableField from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser # 定义默认模型 default_llm = ChatOpenAI(model="gpt-4o") # 为这个 LLM 位置配置可替换的备选模型 llm = default_llm.configurable_alternatives( which=ConfigurableField( id="llm_provider", name="模型供应商", description="选择要使用的大模型", ), # 备选模型列表(键名就是运行时传入的值) deepseek=ChatOpenAI( model="deepseek-chat", openai_api_key=os.getenv("DEEPSEEK_API_KEY"), openai_api_base="https://api.deepseek.com/v1", ), claude=ChatAnthropic(model="claude-opus-4-5"), gpt4o_mini=ChatOpenAI(model="gpt-4o-mini"), # default_key 指定默认使用哪个(不写则用 default_llm) ) chain = ( ChatPromptTemplate.from_messages([("human", "{question}")]) | llm | StrOutputParser() ) # 使用默认模型(gpt-4o) r1 = chain.invoke({"question": "你好"}) # 运行时切换到 DeepSeek r2 = chain.invoke( {"question": "你好"}, config={"configurable": {"llm_provider": "deepseek"}} ) # 运行时切换到 Claude r3 = chain.invoke( {"question": "你好"}, config={"configurable": {"llm_provider": "claude"}} ) print(f"GPT-4o: {r1[:30]}") print(f"DeepSeek: {r2[:30]}") print(f"Claude: {r3[:30]}")
这个特性在以下场景中极具价值:
bind 和 configurable_fields 是互补的:bind 在定义时固定参数(静态),configurable_fields 允许运行时修改(动态):
bind
configurable_fields
from langchain_openai import ChatOpenAI llm = ChatOpenAI(model="gpt-4o") # bind:创建参数被固定的新 Runnable(不影响原始 llm 对象) precise_llm = llm.bind(temperature=0.0, max_tokens=100) creative_llm = llm.bind(temperature=1.5) json_llm = llm.bind(response_format={"type": "json_object"}) # 强制 JSON 输出 # 实用场景:给模型绑定工具(将在第七篇详细讲) from langchain_core.tools import tool @tool def get_weather(city: str) -> str: """获取指定城市的天气""" return f"{city}今天晴天,25°C" # 把工具绑定到模型,模型获得调用这个工具的能力 llm_with_tools = llm.bind_tools([get_weather]) response = llm_with_tools.invoke("北京今天天气怎么样?") # 模型会生成 tool_use 类型的响应,而不是直接回答
from langchain_openai import ChatOpenAI from langchain_core.exceptions import OutputParserException llm = ChatOpenAI(model="gpt-4o") # 配置重试策略 llm_with_retry = llm.with_retry( retry_if_exception_type=(Exception,), # 遇到哪类异常才重试 stop_after_attempt=3, # 最多重试 3 次 wait_exponential_jitter=True, # 指数退避 + 随机抖动,避免雪崩 ) chain = prompt | llm_with_retry | parser
from langchain_openai import ChatOpenAI # 主模型 primary = ChatOpenAI(model="gpt-4o") # 降级模型列表(按优先级排列) fallback1 = ChatOpenAI(model="gpt-4o-mini") # 降级到更便宜的模型 fallback2 = ChatOpenAI( # 再降级到国产模型 model="deepseek-chat", openai_api_key="...", openai_api_base="https://api.deepseek.com/v1", ) # 构建带降级的链 resilient_llm = primary.with_fallbacks([fallback1, fallback2]) # 当 primary 失败时,自动尝试 fallback1,再失败则尝试 fallback2 chain = prompt | resilient_llm | parser
from langchain_core.runnables import RunnableLambda def safe_parse(text: str) -> dict: """带错误处理的解析步骤""" import json try: return json.loads(text) except json.JSONDecodeError: # 解析失败时返回结构化的错误信息 return {"error": "解析失败", "raw": text, "success": False} chain = ( prompt | llm | StrOutputParser() | RunnableLambda(safe_parse) # 安全的解析步骤 ) result = chain.invoke({"q": "返回 JSON 格式的自我介绍"}) if result.get("error"): print("解析失败,原始输出:", result["raw"]) else: print("解析成功:", result)
把本篇所有知识点综合应用,构建一个完整的检索增强生成(RAG)链。这个例子不涉及真实的向量数据库(那是第九篇的内容),而是聚焦于 LCEL 的链式组合模式:
import os from operator import itemgetter from dotenv import load_dotenv from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import ( RunnablePassthrough, RunnableParallel, RunnableLambda, ) from langchain_core.messages import HumanMessage, AIMessage load_dotenv() # ========== 模拟检索器 ========== # 实际项目中替换为 vectorstore.as_retriever() def mock_retriever(question: str) -> str: """模拟从知识库检索相关文档""" knowledge_base = { "langchain": "LangChain 是一个 LLM 应用开发框架,提供链式调用、提示词管理、记忆模块等功能。", "lcel": "LCEL(LangChain Expression Language)是 LangChain 1.0 的核心,通过 | 管道符组合 Runnable 组件。", "rag": "RAG(检索增强生成)通过在生成前检索相关文档,向模型提供外部知识,提升回答的准确性。", } question_lower = question.lower() for key, doc in knowledge_base.items(): if key in question_lower: return doc return "未找到相关文档。" # ========== 构建 RAG 链 ========== llm = ChatOpenAI(model="gpt-4o", temperature=0.3) # 1. 问题重写链:把多轮对话中的指代词解析清楚 question_rewrite_chain = ( ChatPromptTemplate.from_messages([ ("system", "根据对话历史,将用户最新问题改写为独立、完整的问题(不依赖上下文)。只输出改写后的问题。"), MessagesPlaceholder("chat_history"), ("human", "用户问题:{question}"), ]) | llm | StrOutputParser() ) # 2. 完整的 RAG 链(带多轮对话支持) rag_prompt = ChatPromptTemplate.from_messages([ ("system", """你是一个知识库问答助手,根据提供的上下文回答问题。 上下文: {context} 回答规则: - 基于上下文回答,不要编造信息 - 如果上下文中没有相关信息,明确说明 - 回答要简洁准确"""), MessagesPlaceholder("chat_history"), ("human", "{question}"), ]) rag_chain = ( RunnablePassthrough.assign( # 先重写问题(解决指代词问题) standalone_question=question_rewrite_chain, ) | RunnablePassthrough.assign( # 用重写后的问题检索文档 context=lambda x: mock_retriever(x["standalone_question"]), ) | RunnableParallel( # 并行:生成回答 + 保留原始问题(用于保存到历史) answer=rag_prompt | llm | StrOutputParser(), question=itemgetter("question"), context=itemgetter("context"), ) ) # ========== 带历史的对话管理器 ========== class RAGChatbot: def __init__(self, chain): self.chain = chain self.history = [] def chat(self, question: str) -> str: result = self.chain.invoke({ "question": question, "chat_history": self.history, }) # 更新历史 self.history.append(HumanMessage(content=question)) self.history.append(AIMessage(content=result["answer"])) return result["answer"] def show_context(self, question: str) -> str: """调试用:查看检索到的上下文""" result = self.chain.invoke({ "question": question, "chat_history": self.history, }) return result["context"] # ========== 使用 ========== bot = RAGChatbot(rag_chain) print(bot.chat("什么是 LangChain?")) print(bot.chat("它的 LCEL 是什么?")) # "它"会被正确解析为 LangChain print(bot.chat("RAG 技术是什么原理?"))
LCEL 非常强大,但它也不是银弹,了解它的局限有助于你做出正确的技术选型。
循环和递归:LCEL 本质上是有向无环图(DAG),不支持循环。如果你的 Agent 需要"反复调用工具直到满意为止"这类循环逻辑,LCEL 的 AgentExecutor 在内部用了 while 循环,但你自己用 LCEL 无法直接表达循环。这类场景需要使用第十篇介绍的 LangGraph。
AgentExecutor
复杂的状态管理:LCEL 的每次调用是无状态的,数据通过链传递。如果你需要跨步骤共享和修改复杂状态(比如多 Agent 协作时的共享黑板),LangGraph 的 State 机制更合适。
State
动态决定链的长度:如果执行步骤的数量在运行时才能确定(比如"递归分解任务,直到每个子任务足够简单"),LCEL 的静态链结构无法直接支持。
RunnableBranch
经过前面十个章节的系统讲解,我们来汇总一套在实际项目中经过验证的 LCEL 开发规范,帮助你少踩坑。
单一职责:每个链只做一件事
# ❌ 反例:一个链里塞了太多逻辑,难以测试和复用 monolithic_chain = ( RunnableLambda(validate_input) | RunnableLambda(preprocess) | RunnableLambda(fetch_context) | prompt | llm | parser | RunnableLambda(postprocess) | RunnableLambda(save_to_db) | RunnableLambda(send_notification) ) # ✅ 正例:拆分成有明确职责的子链,组合时逻辑清晰 retrieval_chain = RunnableLambda(fetch_context) # 检索子链 generation_chain = prompt | llm | parser # 生成子链 persistence_chain = RunnableLambda(save_to_db) # 持久化子链 full_chain = ( RunnableLambda(validate_input) | RunnablePassthrough.assign(context=retrieval_chain) | generation_chain | persistence_chain )
显式的数据结构:清晰的字段命名
# ❌ 反例:中间步骤用模糊的字段名,难以理解数据流 chain = ( RunnablePassthrough.assign(x=get_something) | RunnablePassthrough.assign(y=process_x) | prompt | llm ) # ✅ 正例:字段名描述其语义 chain = ( RunnablePassthrough.assign(retrieved_docs=retriever) | RunnablePassthrough.assign(formatted_context=format_docs) | rag_prompt | llm )
能并行的步骤尽量并行
# ❌ 串行执行三个独立的分析(耗时 = T1 + T2 + T3) chain = step1 | step2 | step3 # ✅ 并行执行独立步骤(耗时 ≈ max(T1, T2, T3)) chain = RunnableParallel( result1=step1, result2=step2, result3=step3, ) # 注意:只有步骤之间没有数据依赖时才能并行
用 batch 代替循环
questions = [{"q": f"问题{i}"} for i in range(20)] # ❌ 串行循环,速度慢 results = [chain.invoke(q) for q in questions] # ✅ batch 内部并发,速度快 results = chain.batch(questions, config={"max_concurrency": 5})
缓存确定性输出
from langchain_core.globals import set_llm_cache from langchain_community.cache import SQLiteCache # 开发阶段开启缓存,相同输入直接返回缓存结果 set_llm_cache(SQLiteCache(".cache.db")) # 注意:生产环境谨慎使用,缓存可能导致模型更新后还返回旧结果
让链可以被单元测试的关键,是把副作用(API 调用、数据库操作)隔离到可替换的 Runnable 中:
from langchain_core.language_models.fake import FakeListChatModel from langchain_core.runnables import RunnableLambda # 用 FakeListChatModel 替换真实 LLM,单元测试不消耗 Token fake_llm = FakeListChatModel( responses=["这是模拟的回答。"] # 预设返回值 ) # 业务逻辑链(llm 作为参数传入,便于替换) def create_qa_chain(llm): return ( ChatPromptTemplate.from_messages([("human", "{question}")]) | llm | StrOutputParser() ) # 测试代码 def test_qa_chain(): chain = create_qa_chain(fake_llm) result = chain.invoke({"question": "测试问题"}) assert result == "这是模拟的回答。" assert isinstance(result, str) # 生产代码 real_chain = create_qa_chain(ChatOpenAI(model="gpt-4o"))
KeyError: 'xxx'
prompt.input_variables
StrOutputParser()
\| StrOutputParser()
return_exceptions=True
partial
await
asyncio.run()
\|
stream
ainvoke
.assign()
with_retry
with_fallbacks
get_graph().print_ascii()
你现在应该能做到: - 清晰理解 | 管道符的底层机制,知道它为什么能工作 - 用 RunnablePassthrough.assign() 构建标准 RAG 数据流 - 用 RunnableParallel 实现多维度并行分析 - 用 RunnableBranch 构建智能路由链 - 为生产链添加重试和降级逻辑 - 分步调试链,快速定位类型不匹配等问题
RunnablePassthrough.assign()
《OutputParser——让模型输出变成可用的数据》
LCEL 链的末端通常都有一个 OutputParser。下一篇将深入介绍 LangChain 所有内置 Parser 的用法:
PydanticOutputParser
OutputFixingParser
代码仓库:本系列所有可运行代码示例统一维护在 GitHub,每篇对应独立目录,可直接克隆运行。 系列导航:[第一篇] → [第二篇] → [第三篇] → 第四篇(当前) → 第五篇 → ...
代码仓库:本系列所有可运行代码示例统一维护在 GitHub,每篇对应独立目录,可直接克隆运行。
系列导航:[第一篇] → [第二篇] → [第三篇] → 第四篇(当前) → 第五篇 → ...
还没有评论,来抢沙发吧!
博客管理员
40 篇文章
还没有评论,来抢沙发吧!