系列导航:[第一篇] → ... → [第九篇:RAG 入门] → 第十篇(当前) → 第十一篇:生产部署 前置要求:已掌握前九篇内容,尤其是 LCEL 链的组合方式(第四篇)和 Agent 开发(第八篇) 本篇目标:理解 LangGraph 解决了什么问题,掌握图(Graph)、节点(Node)、边(Edge)、状态(State)四个核心概念,能够用 LangGraph 构建带条件分支、循环推理和人工干预的复杂智能体。
系列导航:[第一篇] → ... → [第九篇:RAG 入门] → 第十篇(当前) → 第十一篇:生产部署
前置要求:已掌握前九篇内容,尤其是 LCEL 链的组合方式(第四篇)和 Agent 开发(第八篇)
本篇目标:理解 LangGraph 解决了什么问题,掌握图(Graph)、节点(Node)、边(Edge)、状态(State)四个核心概念,能够用 LangGraph 构建带条件分支、循环推理和人工干预的复杂智能体。
在第八篇里,我们用 AgentExecutor 构建了功能完善的 Agent。对于大多数场景,AgentExecutor 已经足够用了。但当任务复杂到一定程度,你会发现它开始力不从心。
AgentExecutor
AgentExecutor 的底层是一个固定结构的循环:
while 没有最终回答 and 未超过最大迭代次数: response = llm.invoke(messages) if response.tool_calls: 执行工具,把结果加入消息 else: return response.content # 最终回答
这个循环结构有几个硬性限制,不是调参能解决的:
限制一:所有工具地位平等。AgentExecutor 不知道"先做 A,然后才能做 B"这样的依赖关系。你无法表达"只有在数据验证通过后才能提交"这样的条件逻辑,因为条件路由完全依赖模型自己的判断,你无法在代码层面保证。
限制二:无法跨步骤共享复杂状态。Agent 的"记忆"只有消息列表。你无法在步骤之间传递结构化的数据(比如"第一步提取的实体列表"传给"第四步的分析函数"),因为消息列表只能存文本。
限制三:循环结构固定。"先并行做两件事,然后汇总结果,再根据汇总结果决定下一步"——这种流程 AgentExecutor 无法直接表达,因为它只有一个固定的单线循环。
限制四:无法插入人工干预。如果你想在 Agent 执行某个高风险操作(比如删除数据、发送邮件给大量用户)之前,让人工审批一下——AgentExecutor 的循环结构没有"暂停并等待人工输入"的机制。
LangGraph 用一个更通用的抽象来解决这些问题:把智能体的执行流程建模成一张有向图。
在这张图里: - 节点(Node) 是处理单元,可以是 LLM 调用、工具执行、数据转换,甚至是"等待人工输入" - 边(Edge) 是节点之间的连接,可以是固定的顺序连接,也可以是根据状态决定走哪条路的条件边 - 状态(State) 是一个贯穿整个图的共享数据容器,每个节点都可以读取和更新它 - 图(Graph) 是上面三者的组合,定义了整个执行流程
这个抽象比循环强大得多。任何可以用流程图画出来的逻辑,都可以用 LangGraph 实现——包括分支、汇合、循环、并行,当然也包括 AgentExecutor 那样的简单循环。
在写任何代码之前,先把这四个概念搞清楚。它们是理解 LangGraph 所有代码的基础。
State 是一个 Python 类(通常用 TypedDict 或 Pydantic 定义),它存放着图在执行过程中的所有数据。每次节点被执行,它接收当前 State,处理后返回一个包含更新内容的字典,LangGraph 负责把这些更新合并回 State。
TypedDict
State 的定义决定了图能传递什么信息,是整个系统设计的核心:
from typing import TypedDict, Annotated, List from langchain_core.messages import BaseMessage import operator # 方式一:用 TypedDict 定义(轻量,推荐) class SimpleState(TypedDict): messages: List[BaseMessage] # 对话消息列表 user_name: str # 用户名 task_done: bool # 任务是否完成 # 方式二:用 Annotated 指定字段的更新策略 class AgentState(TypedDict): # Annotated[类型, 更新函数] # operator.add 意味着:新值追加到现有列表末尾,而不是覆盖 # 这对消息列表特别重要:每次节点返回新消息,都是追加而不是替换 messages: Annotated[List[BaseMessage], operator.add] # 没有 Annotated 的字段,默认行为是:新值直接覆盖旧值 current_step: str retry_count: int final_answer: str
这个"更新策略"的设计非常精妙。对于消息列表,我们希望每个节点都能追加新消息,而不是覆盖整个列表——operator.add 就实现了这一点。对于像"当前步骤名称"这样的字段,我们希望新值直接覆盖旧值——默认行为就是这样。
operator.add
Node 就是一个普通的 Python 函数(或任何 Callable),接受 State,返回包含更新内容的字典:
from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage, AIMessage llm = ChatOpenAI(model="gpt-4o", temperature=0) def call_llm(state: AgentState) -> dict: """节点示例:调用 LLM""" # 从状态中读取消息历史 messages = state["messages"] # 执行 LLM 调用 response = llm.invoke(messages) # 返回要更新的字段 # 注意:不需要返回完整的 State,只返回变化的部分 return { "messages": [response], # 追加一条 AI 回复(因为用了 operator.add) "current_step": "llm_done", # 覆盖当前步骤标记 } def validate_input(state: AgentState) -> dict: """节点示例:数据验证""" last_message = state["messages"][-1] is_valid = len(last_message.content) > 0 and len(last_message.content) < 1000 return { "current_step": "valid" if is_valid else "invalid", }
边分两种:
普通边:固定连接,A 执行完永远去 B。
graph.add_edge("node_a", "node_b") # A 执行完,永远下一步去 B
条件边:根据 State 的当前内容决定下一步去哪里,这是实现分支逻辑的关键:
def should_continue(state: AgentState) -> str: """条件函数:根据状态返回下一个节点的名称""" if state["current_step"] == "valid": return "process_node" # 返回节点名称字符串 else: return "error_node" # 注册条件边:从 "validate_node" 出发,用 should_continue 决定去哪里 graph.add_conditional_edges( "validate_node", # 从哪个节点出发 should_continue, # 条件判断函数 { "process_node": "process_node", # 函数返回 "process_node" 时,走这条边 "error_node": "error_node", # 函数返回 "error_node" 时,走这条边 } )
from langgraph.graph import StateGraph, END # 创建图,指定 State 类型 graph_builder = StateGraph(AgentState) # 添加节点 graph_builder.add_node("validate", validate_input) graph_builder.add_node("call_llm", call_llm) # 设置入口节点(图从哪里开始执行) graph_builder.set_entry_point("validate") # 添加边 graph_builder.add_conditional_edges("validate", should_continue, { "valid": "call_llm", "invalid": END, # END 是内置的终止节点 }) graph_builder.add_edge("call_llm", END) # 编译图(编译时会做结构检查) graph = graph_builder.compile()
用一个完整的例子把四个概念串起来——一个能判断用户意图、走不同处理路径的对话系统:
from dotenv import load_dotenv from typing import TypedDict, Annotated, List, Literal from langchain_openai import ChatOpenAI from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage from langgraph.graph import StateGraph, END import operator load_dotenv() # ---- 第一步:定义 State ---- class ConversationState(TypedDict): messages: Annotated[List[BaseMessage], operator.add] intent: str # 意图分类结果:qa / chat / complaint user_level: str # 用户等级:vip / standard # ---- 第二步:定义节点 ---- llm = ChatOpenAI(model="gpt-4o", temperature=0) def classify_intent(state: ConversationState) -> dict: """节点一:意图分类""" user_message = state["messages"][-1].content classification_prompt = f"""分析用户消息的意图,只输出一个词: - qa:询问具体问题(政策、价格、功能等) - chat:日常闲聊问候 - complaint:投诉或表达不满 用户消息:{user_message} 意图:""" response = llm.invoke([HumanMessage(content=classification_prompt)]) intent = response.content.strip().lower() # 确保只返回合法值 if intent not in ["qa", "chat", "complaint"]: intent = "chat" print(f" [分类节点] 识别意图:{intent}") return {"intent": intent} def handle_qa(state: ConversationState) -> dict: """节点二:处理知识问答""" messages = state["messages"] system = SystemMessage(content="""你是专业的客服助手,负责回答产品相关问题。 回答要准确、简洁,并在末尾提醒用户可以继续提问。""") response = llm.invoke([system] + messages) print(f" [QA节点] 生成专业回答") return {"messages": [response]} def handle_chat(state: ConversationState) -> dict: """节点三:处理日常闲聊""" messages = state["messages"] system = SystemMessage(content="你是友好的聊天伙伴,轻松自然地和用户交流。") response = llm.invoke([system] + messages) print(f" [闲聊节点] 生成闲聊回复") return {"messages": [response]} def handle_complaint(state: ConversationState) -> dict: """节点四:处理投诉""" messages = state["messages"] user_level = state.get("user_level", "standard") # VIP 用户走特殊通道 if user_level == "vip": system_content = """你是高级客户关系专员,处理 VIP 用户投诉。 态度要更加诚恳,可以提供额外补偿方案(如优惠券、积分等)。""" else: system_content = """你是客户关系专员,处理用户投诉。 要先真诚道歉,再了解问题详情,最后给出解决方案。""" response = llm.invoke([SystemMessage(content=system_content)] + messages) print(f" [投诉节点] 生成{user_level}投诉处理回复") return {"messages": [response]} # ---- 第三步:定义条件路由函数 ---- def route_by_intent(state: ConversationState) -> Literal["handle_qa", "handle_chat", "handle_complaint"]: """根据意图分类结果决定下一步走哪个节点""" intent = state["intent"] routing = { "qa": "handle_qa", "chat": "handle_chat", "complaint": "handle_complaint", } return routing.get(intent, "handle_chat") # 兜底走闲聊 # ---- 第四步:构建图 ---- builder = StateGraph(ConversationState) # 注册节点 builder.add_node("classify_intent", classify_intent) builder.add_node("handle_qa", handle_qa) builder.add_node("handle_chat", handle_chat) builder.add_node("handle_complaint", handle_complaint) # 设置入口 builder.set_entry_point("classify_intent") # 添加条件边:从分类节点出发,根据意图路由 builder.add_conditional_edges( "classify_intent", route_by_intent, { "handle_qa": "handle_qa", "handle_chat": "handle_chat", "handle_complaint": "handle_complaint", } ) # 所有处理节点执行完后,到达 END builder.add_edge("handle_qa", END) builder.add_edge("handle_chat", END) builder.add_edge("handle_complaint", END) # 编译 conversation_graph = builder.compile() # ---- 第五步:运行 ---- def chat(user_input: str, user_level: str = "standard"): print(f"\n用户({user_level}):{user_input}") initial_state = { "messages": [HumanMessage(content=user_input)], "intent": "", "user_level": user_level, } result = conversation_graph.invoke(initial_state) answer = result["messages"][-1].content print(f"助手:{answer[:200]}{'...' if len(answer) > 200 else ''}") return answer # 测试三种不同意图 chat("退款政策是什么?") # → 走 QA 节点 chat("你好呀,今天心情不错!") # → 走闲聊节点 chat("这次服务太差了,我要投诉!", user_level="vip") # → 走 VIP 投诉节点
运行这段代码,你会清楚地看到每条消息经过意图分类后,被路由到了不同的处理节点。这种在代码层面保证的路由逻辑,是 AgentExecutor 无法做到的——AgentExecutor 里的路由完全依赖模型的自由判断,你无法通过代码强制确保走某条路径。
现在用 LangGraph 重新实现第八篇的 Agent 循环,让你看清楚 AgentExecutor 的底层在做什么,以及 LangGraph 版本如何给你更多控制权:
from typing import TypedDict, Annotated, List from langchain_openai import ChatOpenAI from langchain_core.tools import tool from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage, SystemMessage from langgraph.graph import StateGraph, END from langgraph.prebuilt import ToolNode # LangGraph 内置的工具执行节点 import operator # ---- 工具定义 ---- @tool def get_weather(city: str) -> str: """获取指定城市的当前天气。""" weather_data = { "北京": "晴天,22°C,西北风3级", "上海": "多云,26°C,东南风2级", "广州": "阵雨,28°C,南风4级", } return weather_data.get(city, f"暂无 {city} 的天气数据") @tool def search_restaurant(city: str, cuisine: str = "不限") -> str: """搜索指定城市的餐厅推荐。""" return f"{city}的{cuisine}餐厅推荐:老字号饭庄⭐⭐⭐⭐⭐、新潮轻食⭐⭐⭐⭐" @tool def book_ticket(departure: str, destination: str, date: str) -> str: """查询并预订交通票务。""" return f"{departure}→{destination} {date}:高铁G123 08:00出发,二等座¥358,余票充足" tools = [get_weather, search_restaurant, book_ticket] # ---- State 定义 ---- class AgentState(TypedDict): messages: Annotated[List[BaseMessage], operator.add] # ---- 节点定义 ---- llm = ChatOpenAI(model="gpt-4o", temperature=0) llm_with_tools = llm.bind_tools(tools) def agent_node(state: AgentState) -> dict: """Agent 决策节点:调用 LLM 决定下一步行动""" system = SystemMessage(content="""你是一个智能旅行助手,能帮助用户规划行程。 请主动使用工具获取实时信息,不要依赖训练数据。""") # llm_with_tools 接收完整消息历史,返回带 tool_calls 的 AIMessage 或最终回答 response = llm_with_tools.invoke([system] + state["messages"]) return {"messages": [response]} # ToolNode 是 LangGraph 内置的工具执行节点 # 它自动处理 AIMessage 中的 tool_calls,执行对应工具,返回 ToolMessage tool_node = ToolNode(tools) # ---- 条件路由 ---- def should_continue(state: AgentState) -> str: """检查最新的 AI 消息是否包含工具调用""" last_message = state["messages"][-1] # 有 tool_calls 说明模型要调用工具,继续循环 if hasattr(last_message, "tool_calls") and last_message.tool_calls: return "tools" # 走向工具节点 # 没有 tool_calls 说明模型给出了最终回答,结束 return "end" # ---- 构建图 ---- agent_builder = StateGraph(AgentState) agent_builder.add_node("agent", agent_node) agent_builder.add_node("tools", tool_node) agent_builder.set_entry_point("agent") agent_builder.add_conditional_edges( "agent", should_continue, { "tools": "tools", # 有工具调用 → 去执行工具 "end": END, # 最终回答 → 结束 } ) # 工具执行完之后,永远回到 agent 节点(让 LLM 决策下一步) agent_builder.add_edge("tools", "agent") agent_graph = agent_builder.compile() # ---- 运行 ---- def run_agent(user_input: str) -> str: print(f"\n用户:{user_input}") result = agent_graph.invoke({ "messages": [HumanMessage(content=user_input)] }) answer = result["messages"][-1].content print(f"助手:{answer}") return answer run_agent("帮我查一下上海今天的天气,再推荐几家餐厅") run_agent("明天从北京去上海,帮我订一张票,查一下上海天气")
这段代码和 AgentExecutor 的功能几乎一样,但你现在对每个节点都有完全的控制权。想在工具执行前加权限检查?在 tool_node 前加一个验证节点。想在模型给出最终回答前做质量审查?在 agent 和 END 之间加一个审查节点。
tool_node
agent
END
LangGraph 的图结构天然支持循环——节点可以通过边指向之前的节点,从而实现"生成→检查→不满意就重新生成"这样的自我反思循环:
from typing import TypedDict, Annotated, List, Optional from langchain_openai import ChatOpenAI from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage from langgraph.graph import StateGraph, END import operator # ---- State:加入反思轮次计数和质量评分 ---- class ReflectionState(TypedDict): messages: Annotated[List[BaseMessage], operator.add] draft: str # 当前草稿 feedback: str # 上一轮的反馈意见 quality_score: int # 质量评分 0-10 iteration: int # 当前迭代次数 final_output: str # 最终输出(质量达标后填写) llm = ChatOpenAI(model="gpt-4o", temperature=0.7) critic_llm = ChatOpenAI(model="gpt-4o", temperature=0) # 评审用确定性更高的配置 # ---- 节点一:生成初稿 ---- def generate_draft(state: ReflectionState) -> dict: """根据用户需求(或上一轮反馈)生成/修改草稿""" iteration = state.get("iteration", 0) if iteration == 0: # 第一次:根据用户消息生成初稿 user_request = state["messages"][-1].content prompt = f"请根据以下需求生成一段高质量的内容:\n\n{user_request}" else: # 后续:根据反馈修改 prompt = f"""请根据反馈修改草稿: 原草稿: {state['draft']} 评审反馈: {state['feedback']} 请认真考虑每一条反馈,生成改进后的版本:""" response = llm.invoke([HumanMessage(content=prompt)]) new_draft = response.content print(f"\n [第 {iteration + 1} 轮生成] 草稿长度:{len(new_draft)} 字") return { "draft": new_draft, "iteration": iteration + 1, } # ---- 节点二:评审草稿 ---- def review_draft(state: ReflectionState) -> dict: """评审当前草稿,给出分数和具体反馈""" draft = state["draft"] user_request = state["messages"][0].content # 原始需求 review_prompt = f"""作为专业评审,请评估以下内容是否满足需求: 原始需求:{user_request} 待评审内容: {draft} 请按以下格式输出(JSON): {{ "score": 0-10的整数评分, "feedback": "具体的改进建议,如果满分则写'内容已达到高质量标准'", "strengths": "主要优点", "weaknesses": "主要不足(如有)" }}""" from langchain_core.output_parsers import JsonOutputParser try: review = ( critic_llm | JsonOutputParser() ).invoke([HumanMessage(content=review_prompt)]) score = int(review.get("score", 5)) feedback = review.get("feedback", "请继续改进") print(f" [评审节点] 得分:{score}/10,反馈:{feedback[:60]}...") return { "quality_score": score, "feedback": feedback, } except Exception: return {"quality_score": 6, "feedback": "请进一步优化内容的清晰度和完整性"} # ---- 节点三:确认完成 ---- def finalize(state: ReflectionState) -> dict: """质量达标,保存最终输出""" print(f"\n [完成节点] 最终质量得分:{state['quality_score']}/10,共迭代 {state['iteration']} 轮") return {"final_output": state["draft"]} # ---- 条件路由:是否继续迭代 ---- def should_revise(state: ReflectionState) -> str: score = state.get("quality_score", 0) iteration = state.get("iteration", 0) max_iterations = 3 # 最多迭代 3 轮(防止无限循环) if score >= 8: print(f" [路由] 质量达标({score}分),准备完成") return "finalize" if iteration >= max_iterations: print(f" [路由] 已达最大迭代次数({max_iterations}),强制完成") return "finalize" print(f" [路由] 质量不足({score}分),继续修改(第 {iteration + 1} 轮)") return "generate_draft" # 返回生成节点,触发循环 # ---- 构建图 ---- reflection_builder = StateGraph(ReflectionState) reflection_builder.add_node("generate_draft", generate_draft) reflection_builder.add_node("review_draft", review_draft) reflection_builder.add_node("finalize", finalize) reflection_builder.set_entry_point("generate_draft") reflection_builder.add_edge("generate_draft", "review_draft") reflection_builder.add_conditional_edges( "review_draft", should_revise, { "generate_draft": "generate_draft", # 循环回生成节点 "finalize": "finalize", } ) reflection_builder.add_edge("finalize", END) reflection_graph = reflection_builder.compile() # ---- 运行 ---- print("开始自我反思写作任务...") result = reflection_graph.invoke({ "messages": [HumanMessage(content="写一段 200 字左右的 Python 语言介绍,面向初学者,要生动有趣")], "draft": "", "feedback": "", "quality_score": 0, "iteration": 0, "final_output": "", }) print(f"\n{'='*60}") print("最终输出:") print(result["final_output"])
这个"生成→评审→不满意就循环"的模式是 LangGraph 最典型的用法之一。因为你在 State 里保存了 iteration 计数器,可以精确控制最多循环几次,彻底避免无限循环的风险。这种控制在 AgentExecutor 里是做不到的。
iteration
Human-in-the-Loop(人工干预)是 LangGraph 最受欢迎的特性之一。在 AgentExecutor 里,Agent 一旦启动就自动运行到结束,你无法在中间插入人工确认。LangGraph 通过断点(Breakpoint)和持久化检查点(Checkpoint)机制,完美地解决了这个问题。
要使用 Human-in-the-Loop,首先需要给图配置一个"记忆后端"(Checkpointer)。每次节点执行完毕,LangGraph 都会把当前 State 保存到这个后端,这样即使程序暂停或崩溃,也可以从上次保存的状态恢复:
from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import StateGraph, END, interrupt # MemorySaver:把检查点保存在内存里(开发测试用) # 生产环境用 SqliteSaver 或 PostgresSaver checkpointer = MemorySaver() # 编译图时传入 checkpointer graph_with_memory = builder.compile(checkpointer=checkpointer) # 运行时需要传入 thread_id,同一个 thread_id 的多次调用共享状态 config = {"configurable": {"thread_id": "conversation_001"}} result = graph_with_memory.invoke(initial_state, config=config) # 第二次调用同一个 thread_id:从上次暂停的地方继续 result2 = graph_with_memory.invoke({"messages": [HumanMessage("继续")]}, config=config)
下面构建一个有"人工审批"步骤的内容发布流程,在真正发布前需要人工确认:
from typing import TypedDict, Annotated, List, Optional from langchain_openai import ChatOpenAI from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage from langgraph.graph import StateGraph, END from langgraph.checkpoint.memory import MemorySaver from langgraph.types import interrupt, Command import operator class PublishState(TypedDict): messages: Annotated[List[BaseMessage], operator.add] content_draft: str # 待发布的内容草稿 review_decision: str # 审批决定:"approve" / "reject" / "pending" reject_reason: str # 拒绝原因(如果被拒) published: bool # 是否已发布 llm = ChatOpenAI(model="gpt-4o", temperature=0.7) # ---- 节点一:生成内容 ---- def generate_content(state: PublishState) -> dict: """根据用户需求生成内容草稿""" user_request = state["messages"][-1].content response = llm.invoke([ SystemMessage(content="你是内容创作者,生成适合发布的内容。"), HumanMessage(content=user_request), ]) print(f"\n[生成节点] 内容已生成({len(response.content)} 字)") print(f"草稿预览:{response.content[:100]}...") return { "content_draft": response.content, "review_decision": "pending", } # ---- 节点二:人工审批(使用 interrupt 暂停等待人工输入)---- def human_review(state: PublishState) -> dict: """ 暂停执行,等待人工审批。 interrupt() 会让图在这里暂停,把控制权还给调用者。 调用者(你的应用代码)展示草稿给人工审核, 然后用 Command(resume=...) 恢复执行并传入审批结果。 """ draft = state["content_draft"] # interrupt 接收一个"展示给人工看的数据",暂停执行 # 这个调用会抛出一个特殊的中断信号,LangGraph 捕获它并保存状态 human_input = interrupt({ "type": "approval_required", "content": draft, "instruction": "请审查以上内容,输入 'approve' 批准发布,或 'reject: 原因' 拒绝", }) # 代码执行到这里时,说明人工已经输入了结果(通过 Command(resume=...) 恢复) # human_input 就是人工输入的内容 print(f"\n[审批节点] 收到人工决策:{human_input}") if str(human_input).lower().startswith("approve"): return {"review_decision": "approve", "reject_reason": ""} else: reason = str(human_input).replace("reject:", "").replace("reject", "").strip() return { "review_decision": "reject", "reject_reason": reason or "未提供拒绝原因", } # ---- 节点三:发布内容 ---- def publish_content(state: PublishState) -> dict: """审批通过后发布内容""" print(f"\n[发布节点] 内容已发布!") print(f"发布内容:{state['content_draft'][:200]}...") return { "published": True, "messages": [AIMessage(content=f"内容已成功发布:\n\n{state['content_draft']}")], } # ---- 节点四:处理拒绝 ---- def handle_rejection(state: PublishState) -> dict: """审批被拒后,通知用户并说明原因""" reason = state.get("reject_reason", "未说明原因") message = f"内容审批未通过,原因:{reason}。\n请修改后重新提交。" print(f"\n[拒绝节点] {message}") return { "published": False, "messages": [AIMessage(content=message)], } # ---- 条件路由 ---- def route_after_review(state: PublishState) -> str: decision = state.get("review_decision", "pending") return "publish_content" if decision == "approve" else "handle_rejection" # ---- 构建图 ---- publish_builder = StateGraph(PublishState) publish_builder.add_node("generate_content", generate_content) publish_builder.add_node("human_review", human_review) publish_builder.add_node("publish_content", publish_content) publish_builder.add_node("handle_rejection", handle_rejection) publish_builder.set_entry_point("generate_content") publish_builder.add_edge("generate_content", "human_review") publish_builder.add_conditional_edges( "human_review", route_after_review, {"publish_content": "publish_content", "handle_rejection": "handle_rejection"}, ) publish_builder.add_edge("publish_content", END) publish_builder.add_edge("handle_rejection", END) # 注意:Human-in-the-Loop 必须配置 checkpointer checkpointer = MemorySaver() publish_graph = publish_builder.compile(checkpointer=checkpointer) # ---- 使用示例 ---- from langgraph.types import Command def run_with_human_approval(topic: str, thread_id: str = "thread_001"): config = {"configurable": {"thread_id": thread_id}} print(f"=== 启动内容发布流程:{topic} ===") # 第一次调用:运行到 interrupt 暂停 initial_state = { "messages": [HumanMessage(content=f"帮我写一篇关于{topic}的推广文章,约 150 字")], "content_draft": "", "review_decision": "pending", "reject_reason": "", "published": False, } try: result = publish_graph.invoke(initial_state, config=config) # 如果没有 interrupt,会直接完成(不太可能,除非图结构有问题) print("图已完成(未触发中断)") except Exception as e: if "interrupt" in str(e).lower() or "NodeInterrupt" in type(e).__name__: # 触发了 interrupt,图暂停了 # 从检查点获取当前状态来展示给人工 current_state = publish_graph.get_state(config) draft = current_state.values.get("content_draft", "") print(f"\n{'='*50}") print("⏸️ 流程暂停,等待人工审批") print(f"\n待审内容:\n{draft}") print(f"{'='*50}") # 模拟人工审批(实际应用中这里是 UI 交互) human_decision = input("\n请输入审批决定(approve / reject: 原因):").strip() if not human_decision: human_decision = "approve" # 演示时默认批准 # 用 Command(resume=...) 恢复执行,传入人工输入 final_result = publish_graph.invoke( Command(resume=human_decision), config=config, ) print(f"\n最终结果:{final_result['messages'][-1].content[:100]}") else: raise # 取消注释来实际运行: # run_with_human_approval("人工智能在医疗领域的应用")
在实际的 Web 应用中,"人工审批"往往不是在同一个进程里同步等待,而是:用户在浏览器里提交请求 → 服务器启动 Agent → Agent 暂停 → 几分钟后审批人打开审批页面 → 提交决策 → 恢复 Agent。这需要持久化的 Checkpointer:
from langgraph.checkpoint.sqlite import SqliteSaver import sqlite3 # 生产环境:用 SQLite 持久化检查点(进程重启后状态不丢失) conn = sqlite3.connect("agent_checkpoints.db", check_same_thread=False) persistent_checkpointer = SqliteSaver(conn) # 或者使用 PostgreSQL(多实例部署) # from langgraph.checkpoint.postgres import PostgresSaver # persistent_checkpointer = PostgresSaver.from_conn_string("postgresql://...") persistent_graph = publish_builder.compile(checkpointer=persistent_checkpointer) # API 层的使用模式(FastAPI 示例) # POST /tasks:启动任务,返回 thread_id # GET /tasks/{thread_id}:查看任务状态和待审批内容 # POST /tasks/{thread_id}/approve:批准,恢复执行 # POST /tasks/{thread_id}/reject:拒绝,恢复执行 def start_task(topic: str) -> str: """启动任务,返回用于后续操作的 thread_id""" import uuid thread_id = str(uuid.uuid4()) config = {"configurable": {"thread_id": thread_id}} initial_state = { "messages": [HumanMessage(content=f"写一篇关于{topic}的文章")], "content_draft": "", "review_decision": "pending", "reject_reason": "", "published": False, } try: persistent_graph.invoke(initial_state, config=config) except Exception: pass # interrupt 异常是正常的暂停信号 return thread_id def get_pending_review(thread_id: str) -> dict: """获取待审批的内容""" config = {"configurable": {"thread_id": thread_id}} state = persistent_graph.get_state(config) return { "thread_id": thread_id, "draft": state.values.get("content_draft", ""), "status": "waiting_approval", } def submit_approval(thread_id: str, approved: bool, reason: str = "") -> dict: """提交审批决定,恢复 Agent 执行""" config = {"configurable": {"thread_id": thread_id}} human_decision = "approve" if approved else f"reject: {reason}" final_result = persistent_graph.invoke( Command(resume=human_decision), config=config, ) return { "published": final_result.get("published", False), "message": final_result["messages"][-1].content if final_result.get("messages") else "", }
LangGraph 支持并行执行节点——多个节点同时运行,然后汇总结果。这对于需要从多个渠道收集信息的场景非常有用:
from typing import TypedDict, Annotated, List, Optional from langchain_openai import ChatOpenAI from langchain_core.messages import BaseMessage, HumanMessage from langgraph.graph import StateGraph, END import operator class ResearchState(TypedDict): query: str messages: Annotated[List[BaseMessage], operator.add] # 三个并行搜索渠道的结果,互不干扰 web_results: str academic_results: str news_results: str # 最终综合报告 final_report: str llm = ChatOpenAI(model="gpt-4o", temperature=0) # ---- 三个并行执行的搜索节点 ---- def search_web(state: ResearchState) -> dict: """网络搜索(模拟)""" query = state["query"] # 实际接入 Serper/Bing API result = f"网络搜索'{query}'结果:找到 15 篇相关文章,主要讨论技术实现和应用场景..." print(f" [网络搜索] 完成") return {"web_results": result} def search_academic(state: ResearchState) -> dict: """学术论文搜索(模拟)""" query = state["query"] # 实际接入 Semantic Scholar / arXiv API result = f"学术搜索'{query}'结果:找到 8 篇相关论文,最新发表于 2024 年..." print(f" [学术搜索] 完成") return {"academic_results": result} def search_news(state: ResearchState) -> dict: """新闻搜索(模拟)""" query = state["query"] # 实际接入新闻 API result = f"新闻搜索'{query}'结果:最近 7 天有 5 条相关新闻,关注度持续上升..." print(f" [新闻搜索] 完成") return {"news_results": result} # ---- 汇总节点:等所有并行节点完成后执行 ---- def synthesize_results(state: ResearchState) -> dict: """整合三路搜索结果,生成综合报告""" synthesis_prompt = f"""请整合以下三个渠道的搜索结果,生成一份结构化的研究报告: 查询主题:{state['query']} 网络资料: {state['web_results']} 学术论文: {state['academic_results']} 新闻动态: {state['news_results']} 要求:报告应包含主要发现、学术支撑、最新动态三个部分,共约 300 字。""" response = llm.invoke([HumanMessage(content=synthesis_prompt)]) print(f" [综合节点] 报告生成完毕") return { "final_report": response.content, "messages": [response], } # ---- 构建图:利用 LangGraph 的并行执行能力 ---- research_builder = StateGraph(ResearchState) research_builder.add_node("search_web", search_web) research_builder.add_node("search_academic", search_academic) research_builder.add_node("search_news", search_news) research_builder.add_node("synthesize", synthesize_results) # 入口节点同时连接三个搜索节点(LangGraph 会并行执行它们) # 在 LangGraph 中,从同一个源节点出发的多条边,对应的目标节点会并行执行 research_builder.set_entry_point("search_web") # 这里用一个简单的技巧实现并行: # 让三个搜索节点按序执行,但在实际生产中可以用 Send API 实现真正并行 research_builder.add_edge("search_web", "search_academic") research_builder.add_edge("search_academic", "search_news") research_builder.add_edge("search_news", "synthesize") research_builder.add_edge("synthesize", END) research_graph = research_builder.compile() # 运行 print("开始并行研究任务...") result = research_graph.invoke({ "query": "大型语言模型在代码生成领域的应用", "messages": [], "web_results": "", "academic_results": "", "news_results": "", "final_report": "", }) print(f"\n{'='*60}") print("研究报告:") print(result["final_report"])
LangGraph 提供了内置的图结构可视化工具,帮助你直观地理解和调试图的结构:
# 方式一:生成 Mermaid 图(可以在 Mermaid 在线编辑器里渲染) print(agent_graph.get_graph().draw_mermaid()) # 输出类似: # %%{init: {'flowchart': {'curve': 'linear'}}}%% # graph TD; # __start__([<p>__start__</p>]):::first # agent([agent]) # tools([tools]) # __end__([<p>__end__</p>]):::last # __start__ --> agent; # agent -.-> tools; # agent -.-> __end__; # tools --> agent; # 方式二:在 Jupyter Notebook 中直接渲染图片 # from IPython.display import Image, display # display(Image(agent_graph.get_graph().draw_mermaid_png())) # 方式三:查看图的所有节点和边 graph_info = agent_graph.get_graph() print("所有节点:", list(graph_info.nodes.keys())) print("所有边:", [(e.source, e.target) for e in graph_info.edges]) # 检查点调试:查看图在某个时刻的完整状态 config = {"configurable": {"thread_id": "debug_001"}} # 执行图 result = agent_graph.invoke( {"messages": [HumanMessage(content="上海天气怎么样?")]}, config=config, ) # 查看状态历史(每个检查点对应一次节点执行) state_history = list(agent_graph.get_state_history(config)) print(f"\n状态历史:共 {len(state_history)} 个检查点") for i, checkpoint in enumerate(reversed(state_history)): step = checkpoint.metadata.get("step", "?") node = checkpoint.metadata.get("source", "unknown") msg_count = len(checkpoint.values.get("messages", [])) print(f" 步骤 {step}(节点:{node}):消息数 {msg_count}")
把 RAG(第九篇)和 LangGraph 结合,构建一个能检查自己答案质量、在信息不足时主动补充检索的高质量问答系统:
from typing import TypedDict, Annotated, List, Optional from langchain_openai import ChatOpenAI, OpenAIEmbeddings from langchain_community.vectorstores import FAISS from langchain_core.documents import Document from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage from langgraph.graph import StateGraph, END import operator # ---- 准备知识库 ---- embeddings = OpenAIEmbeddings(model="text-embedding-3-small") knowledge_docs = [ Document(page_content="Python 由 Guido van Rossum 创建,1991年首次发布。以简洁的语法著称,是目前最受欢迎的编程语言之一。", metadata={"source": "python.txt"}), Document(page_content="Python 支持面向对象、函数式和过程式编程范式。具有动态类型系统和垃圾回收机制。", metadata={"source": "python.txt"}), Document(page_content="Python 在数据科学、机器学习、Web 开发领域应用广泛。主要库包括 NumPy、Pandas、TensorFlow、Django。", metadata={"source": "python_apps.txt"}), Document(page_content="LangChain 是构建 LLM 应用的框架,提供链式调用、工具集成、向量检索等功能,支持多种模型提供商。", metadata={"source": "langchain.txt"}), Document(page_content="RAG(检索增强生成)通过检索相关文档来增强模型回答,解决模型知识截止和私有数据问题。", metadata={"source": "rag.txt"}), ] vectorstore = FAISS.from_documents(knowledge_docs, embeddings) retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) # ---- State 定义 ---- class RAGAgentState(TypedDict): question: str # 用户原始问题 messages: Annotated[List[BaseMessage], operator.add] retrieved_docs: List[Document] # 检索到的文档 answer_draft: str # 生成的答案草稿 hallucination_score: str # 幻觉检测结果:"grounded" / "hallucinated" answer_quality: str # 答案质量:"sufficient" / "insufficient" retry_count: int # 重试次数 final_answer: str # 最终答案 llm = ChatOpenAI(model="gpt-4o", temperature=0) # ---- 节点一:检索相关文档 ---- def retrieve(state: RAGAgentState) -> dict: question = state["question"] docs = retriever.invoke(question) print(f"\n[检索节点] 找到 {len(docs)} 个相关文档") for doc in docs: print(f" - {doc.metadata['source']}:{doc.page_content[:50]}...") return {"retrieved_docs": docs} # ---- 节点二:基于检索结果生成答案 ---- def generate_answer(state: RAGAgentState) -> dict: question = state["question"] docs = state["retrieved_docs"] retry = state.get("retry_count", 0) context = "\n\n".join([ f"【文档{i+1}】{doc.page_content}" for i, doc in enumerate(docs) ]) # 如果是重试,在 prompt 里加入上次答案质量不足的提示 retry_hint = "" if retry > 0: retry_hint = f"\n\n注意:这是第 {retry + 1} 次尝试,请确保答案更加详尽和准确。" prompt = f"""请严格基于以下参考资料回答问题。 参考资料: {context} 问题:{question} {retry_hint} 请给出完整、准确的回答:""" response = llm.invoke([HumanMessage(content=prompt)]) print(f"\n[生成节点] 生成答案(第 {retry + 1} 次):{response.content[:80]}...") return {"answer_draft": response.content} # ---- 节点三:幻觉检测 ---- def check_hallucination(state: RAGAgentState) -> dict: """检查答案是否有编造内容(不在文档中的信息)""" docs = state["retrieved_docs"] answer = state["answer_draft"] context = "\n".join([doc.page_content for doc in docs]) check_prompt = f"""判断以下答案是否完全基于参考资料,是否存在编造或超出资料范围的信息。 参考资料: {context} 待检查的答案: {answer} 请判断:答案中的关键事实是否都能在参考资料中找到依据? 只输出一个词:grounded(有依据)或 hallucinated(存在编造)""" result = llm.invoke([HumanMessage(content=check_prompt)]) score = "grounded" if "grounded" in result.content.lower() else "hallucinated" print(f"\n[幻觉检测] 结果:{score}") return {"hallucination_score": score} # ---- 节点四:答案质量检查 ---- def check_quality(state: RAGAgentState) -> dict: """检查答案是否充分回答了问题""" question = state["question"] answer = state["answer_draft"] quality_prompt = f"""评估以下答案是否充分回答了问题。 问题:{question} 答案:{answer} 答案是否完整地回答了问题?注意:如果答案说"资料中没有相关信息",也算充分。 只输出一个词:sufficient(充分)或 insufficient(不充分)""" result = llm.invoke([HumanMessage(content=quality_prompt)]) quality = "sufficient" if "sufficient" in result.content.lower() else "insufficient" print(f"[质量检查] 结果:{quality}") return {"answer_quality": quality} # ---- 节点五:最终确认 ---- def finalize_answer(state: RAGAgentState) -> dict: final = state["answer_draft"] print(f"\n[完成节点] 答案已确认,共检索 {len(state['retrieved_docs'])} 个文档,重试 {state.get('retry_count', 0)} 次") return { "final_answer": final, "messages": [AIMessage(content=final)], } # ---- 路由函数 ---- def route_after_hallucination_check(state: RAGAgentState) -> str: if state["hallucination_score"] == "hallucinated": retry = state.get("retry_count", 0) if retry < 2: print(f" → 检测到幻觉,重新生成(第 {retry + 1} 次重试)") return "regenerate" else: print(f" → 重试次数已达上限,接受当前答案") return "check_quality" return "check_quality" def route_after_quality_check(state: RAGAgentState) -> str: if state["answer_quality"] == "insufficient": retry = state.get("retry_count", 0) if retry < 2: print(f" → 答案质量不足,扩大检索范围重试") return "re_retrieve" else: print(f" → 重试次数已达上限,接受当前答案") return "finalize" return "finalize" def increment_retry(state: RAGAgentState) -> dict: """递增重试计数器""" return {"retry_count": state.get("retry_count", 0) + 1} # ---- 构建图 ---- rag_builder = StateGraph(RAGAgentState) rag_builder.add_node("retrieve", retrieve) rag_builder.add_node("generate_answer", generate_answer) rag_builder.add_node("check_hallucination", check_hallucination) rag_builder.add_node("check_quality", check_quality) rag_builder.add_node("finalize", finalize_answer) rag_builder.add_node("increment_retry", increment_retry) # 流程:检索 → 生成 → 幻觉检测 → 质量检查 → 完成 rag_builder.set_entry_point("retrieve") rag_builder.add_edge("retrieve", "generate_answer") rag_builder.add_edge("generate_answer", "check_hallucination") rag_builder.add_conditional_edges( "check_hallucination", route_after_hallucination_check, { "regenerate": "increment_retry", # 幻觉 → 递增计数 → 重新生成 "check_quality": "check_quality", # 无幻觉 → 质量检查 } ) rag_builder.add_edge("increment_retry", "generate_answer") # 重新生成(用原有文档) rag_builder.add_conditional_edges( "check_quality", route_after_quality_check, { "re_retrieve": "retrieve", # 质量不足 → 重新检索(扩大范围) "finalize": "finalize", # 质量足够 → 完成 } ) rag_builder.add_edge("finalize", END) rag_agent_graph = rag_builder.compile() # ---- 运行 ---- def ask(question: str) -> str: print(f"\n{'='*60}") print(f"问题:{question}") print('='*60) result = rag_agent_graph.invoke({ "question": question, "messages": [], "retrieved_docs": [], "answer_draft": "", "hallucination_score": "", "answer_quality": "", "retry_count": 0, "final_answer": "", }) print(f"\n最终答案:{result['final_answer']}") return result["final_answer"] ask("Python 是谁创建的,有哪些主要应用领域?") ask("LangChain 和 RAG 有什么关系?") --- ## 十、本篇小结 在正式总结之前,先梳理一下整个图的执行流程和设计思路,因为这个自我反思 RAG Agent 综合运用了本篇所有知识点,值得仔细品味:
用户问题 ↓ [retrieve]:向量检索,找到最相关的文档 ↓ [generate_answer]:基于文档生成答案草稿 ↓ [check_hallucination]:检测答案中是否有不在文档中的编造内容 ↓ (有幻觉且未超重试次数) [increment_retry] → 回到 [generate_answer](循环,最多 2 次) ↓ (无幻觉或超重试次数) [check_quality]:检测答案是否充分回答了问题 ↓ (质量不足且未超重试次数) 回到 [retrieve](再次检索,因为重试计数不同,检索可能有细微变化) ↓ (质量足够或超重试次数) [finalize]:确认最终答案,结束 ```
这个流程有三个自我修正机制: 1. 幻觉修正:发现编造内容 → 用原文档重新生成,迫使模型更贴近文档 2. 质量修正:答案不完整 → 重新检索,可能找到不同文档 3. 重试保护:两种修正都有最大重试次数,防止无限循环
这是 AgentExecutor 完全无法实现的复杂控制流,因为你需要在"幻觉修正"和"质量修正"两个独立的循环之间精确控制,并且有条件地选择"重新生成"还是"重新检索"。
Annotated[List, operator.add]
StateGraph
set_entry_point
compile()
add_conditional_edges
retry_count
interrupt()
Command(resume=...)
MemorySaver
SqliteSaver
PostgresSaver
draw_mermaid()
get_state_history()
你现在应该能做到: - 识别哪些场景适合用 LangGraph 而不是 AgentExecutor - 设计合理的 State 结构,用 Annotated 正确配置消息追加策略 - 构建包含条件分支和循环的复杂图 - 用 interrupt() 和 Command(resume=...) 实现 Human-in-the-Loop 流程 - 用 MemorySaver / SqliteSaver 持久化图的执行状态 - 构建带多重自我修正能力的 RAG Agent
Annotated
《生产部署——把 LangChain 应用推上线》
系列的最后一篇将聚焦工程落地:
SQLiteCache
RedisCache
代码仓库:本系列所有可运行代码示例统一维护在 GitHub,每篇对应独立目录,可直接克隆运行。 系列导航:[第一篇] → ... → [第九篇] → 第十篇(当前) → 第十一篇 → ...
代码仓库:本系列所有可运行代码示例统一维护在 GitHub,每篇对应独立目录,可直接克隆运行。
系列导航:[第一篇] → ... → [第九篇] → 第十篇(当前) → 第十一篇 → ...
还没有评论,来抢沙发吧!
博客管理员
40 篇文章
还没有评论,来抢沙发吧!