系列导航:[第一篇] → [第二篇] → [第三篇] → [第四篇] → [第五篇] → 第六篇(当前) → 第七篇:工具调用 前置要求:已掌握前五篇内容,理解 LCEL 链的组合方式和 ChatPromptTemplate 的 MessagesPlaceholder 本篇目标:彻底解决多轮对话的上下文管理问题。从"为什么大模型没有记忆"到"生产级多用户会话管理",覆盖 LangChain 1.0 的官方 Memory 方案、多种存储后端、消息裁剪策略,以及跨会话持久化的完整实现。
系列导航:[第一篇] → [第二篇] → [第三篇] → [第四篇] → [第五篇] → 第六篇(当前) → 第七篇:工具调用
前置要求:已掌握前五篇内容,理解 LCEL 链的组合方式和 ChatPromptTemplate 的 MessagesPlaceholder
本篇目标:彻底解决多轮对话的上下文管理问题。从"为什么大模型没有记忆"到"生产级多用户会话管理",覆盖 LangChain 1.0 的官方 Memory 方案、多种存储后端、消息裁剪策略,以及跨会话持久化的完整实现。
如果你直接用原生 API 做多轮对话,大概率踩过这个坑:
from openai import OpenAI client = OpenAI() # 第一轮 response1 = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": "我叫张伟,是一名 Python 工程师"}] ) print(response1.choices[0].message.content) # "你好,张伟!很高兴认识你,作为一名 Python 工程师..." # 第二轮——直接发新消息 response2 = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": "我之前说我是做什么的?"}] ) print(response2.choices[0].message.content) # "我不知道你之前说了什么,每次对话对我来说都是全新的..."
根本原因:大模型是无状态的 HTTP 服务。每次 API 调用都是一个完全独立的请求,模型对之前发生的任何事情一无所知。
正确的多轮对话实现方式是手动维护历史消息,每次请求都把完整的对话历史发过去:
# 正确做法:维护 messages 列表 messages = [] def chat(user_input: str) -> str: messages.append({"role": "user", "content": user_input}) response = client.chat.completions.create( model="gpt-4o", messages=messages # 每次都发完整历史 ) assistant_reply = response.choices[0].message.content messages.append({"role": "assistant", "content": assistant_reply}) return assistant_reply print(chat("我叫张伟,是 Python 工程师")) print(chat("我是做什么的?")) # 这次能正确回答了
这个手写方案虽然能用,但在工程上有五个问题:
messages
LangChain 的 Memory 模块解决的正是这些问题。
LangChain 0.x 版本提供了 ConversationBufferMemory、ConversationSummaryMemory 等一系列 Memory 类,但这些类在 1.0 版本中已经被标记为遗留(Legacy),官方推荐使用 RunnableWithMessageHistory 作为替代方案。
ConversationBufferMemory
ConversationSummaryMemory
RunnableWithMessageHistory
RunnableWithMessageHistory 的设计非常优雅:它是一个 Runnable 的"包装器",把任意 LCEL 链包装成具有历史记忆能力的链。
┌─────────────────────────────────────┐ │ RunnableWithMessageHistory │ │ │ 用户输入 + session_id──►│ 1. 从 ChatMessageHistory 加载历史 │ │ 2. 把历史注入到 MessagesPlaceholder │ │ 3. 执行内部链 │──► 模型回复 │ 4. 把新的消息对保存到 History │ │ │ │ 内部链:prompt | llm | parser │ └─────────────────────────────────────┘ │ ChatMessageHistory(存储后端) ├── InMemoryChatMessageHistory(内存) ├── RedisChatMessageHistory(Redis) ├── SQLChatMessageHistory(数据库) └── 自定义后端
from dotenv import load_dotenv from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.output_parsers import StrOutputParser from langchain_core.chat_history import InMemoryChatMessageHistory from langchain_core.runnables.history import RunnableWithMessageHistory load_dotenv() # Step 1:定义带历史占位符的链 prompt = ChatPromptTemplate.from_messages([ ("system", "你是一个有帮助的助手,能记住对话内容。"), MessagesPlaceholder(variable_name="history"), # 历史消息插入点 ("human", "{input}"), ]) llm = ChatOpenAI(model="gpt-4o") chain = prompt | llm | StrOutputParser() # Step 2:准备 session -> history 的存储 # 这个字典充当"内存数据库",键是 session_id,值是对应的历史对象 session_store: dict[str, InMemoryChatMessageHistory] = {} def get_session_history(session_id: str) -> InMemoryChatMessageHistory: """根据 session_id 获取或创建对应的历史记录""" if session_id not in session_store: session_store[session_id] = InMemoryChatMessageHistory() return session_store[session_id] # Step 3:用 RunnableWithMessageHistory 包装链 chain_with_history = RunnableWithMessageHistory( chain, get_session_history, # 传入获取历史的函数 input_messages_key="input", # 链输入中,"当前用户消息"对应的键 history_messages_key="history", # prompt 中 MessagesPlaceholder 的 variable_name ) # Step 4:调用时传入 session_id config = {"configurable": {"session_id": "user_001"}} r1 = chain_with_history.invoke({"input": "我叫张伟,是 Python 工程师"}, config=config) print(r1) # "你好,张伟!..." r2 = chain_with_history.invoke({"input": "我是做什么的?"}, config=config) print(r2) # "你是一名 Python 工程师。" # 不同 session_id:完全独立的历史 config2 = {"configurable": {"session_id": "user_002"}} r3 = chain_with_history.invoke({"input": "我是做什么的?"}, config=config2) print(r3) # "我不知道你是做什么的,你还没有告诉我。"
# 查看 user_001 的历史 history = session_store["user_001"] for msg in history.messages: role = "用户" if msg.__class__.__name__ == "HumanMessage" else "助手" print(f"[{role}]: {msg.content}") # [用户]: 我叫张伟,是 Python 工程师 # [助手]: 你好,张伟!很高兴认识你,作为一名 Python 工程师... # [用户]: 我是做什么的? # [助手]: 你是一名 Python 工程师。
InMemoryChatMessageHistory 只在进程运行期间有效,进程重启后历史全部丢失。生产环境需要持久化存储。
InMemoryChatMessageHistory
from langchain_community.chat_message_histories import SQLChatMessageHistory from langchain_core.runnables.history import RunnableWithMessageHistory def get_sqlite_history(session_id: str) -> SQLChatMessageHistory: """使用 SQLite 持久化存储""" return SQLChatMessageHistory( session_id=session_id, connection_string="sqlite:///chat_history.db", # 也可以用 PostgreSQL: # connection_string="postgresql://user:pass@localhost/mydb" ) chain_with_history = RunnableWithMessageHistory( chain, get_sqlite_history, input_messages_key="input", history_messages_key="history", ) # 调用方式完全相同,历史会自动写入 SQLite 文件 config = {"configurable": {"session_id": "user_001"}} r1 = chain_with_history.invoke({"input": "你好,我叫李明"}, config=config) # 进程重启后再次调用,历史依然存在 r2 = chain_with_history.invoke({"input": "我叫什么名字?"}, config=config) print(r2) # "你叫李明。"
Redis 是生产环境最常用的会话存储方案,支持 TTL(自动过期),天然适合存储有时效性的对话历史:
from langchain_community.chat_message_histories import RedisChatMessageHistory from langchain_core.runnables.history import RunnableWithMessageHistory def get_redis_history(session_id: str) -> RedisChatMessageHistory: """使用 Redis 存储,支持 TTL 自动过期""" return RedisChatMessageHistory( session_id=session_id, url="redis://localhost:6379/0", ttl=86400, # 历史保留 24 小时后自动删除(秒) key_prefix="chat:", # Redis key 前缀,格式:chat:{session_id} ) chain_with_history = RunnableWithMessageHistory( chain, get_redis_history, input_messages_key="input", history_messages_key="history", ) # 在分布式环境中,多个服务实例共享同一个 Redis, # 同一个 session_id 的请求无论落到哪个实例,都能拿到完整历史 config = {"configurable": {"session_id": "user_001"}} result = chain_with_history.invoke({"input": "你好"}, config=config)
当内置的存储方案不满足需求时(比如你的历史数据要存到 MongoDB 或自己的业务数据库),可以继承 BaseChatMessageHistory 实现自定义后端:
BaseChatMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory from langchain_core.messages import BaseMessage, messages_from_dict, messages_to_dict from typing import List import json class MongoChatMessageHistory(BaseChatMessageHistory): """ 自定义 MongoDB 存储后端(示例,演示实现方式) """ def __init__(self, session_id: str, connection_string: str, database: str): self.session_id = session_id # 实际使用时初始化 pymongo 客户端 # self.client = MongoClient(connection_string) # self.collection = self.client[database]["chat_history"] self._messages: List[BaseMessage] = [] # 简化演示,用内存代替 @property def messages(self) -> List[BaseMessage]: """加载历史消息""" # 实际从 MongoDB 查询: # doc = self.collection.find_one({"session_id": self.session_id}) # if doc: # return messages_from_dict(doc["messages"]) return self._messages def add_message(self, message: BaseMessage) -> None: """保存一条消息""" self._messages.append(message) # 实际写入 MongoDB: # self.collection.update_one( # {"session_id": self.session_id}, # {"$push": {"messages": messages_to_dict([message])[0]}}, # upsert=True, # ) def clear(self) -> None: """清空历史""" self._messages = [] # self.collection.delete_one({"session_id": self.session_id}) def get_mongo_history(session_id: str) -> MongoChatMessageHistory: return MongoChatMessageHistory( session_id=session_id, connection_string="mongodb://localhost:27017", database="myapp", ) chain_with_history = RunnableWithMessageHistory( chain, get_mongo_history, input_messages_key="input", history_messages_key="history", )
SQLChatMessageHistory
RedisChatMessageHistory
随着对话轮数增加,历史消息越来越长,最终会超过模型的上下文窗口限制(如 GPT-4o 的 128K tokens)。即使没有超限,过长的历史也会大幅增加每次请求的 Token 成本。
trim_messages 是 LangChain 1.0 内置的消息裁剪工具,按 Token 数控制历史长度:
trim_messages
from langchain_core.messages import trim_messages, HumanMessage, AIMessage, SystemMessage from langchain_openai import ChatOpenAI llm = ChatOpenAI(model="gpt-4o") # 构建裁剪器 trimmer = trim_messages( max_tokens=2000, # 历史消息最多保留 2000 个 Token strategy="last", # "last":保留最近的消息;"first":保留最早的消息 token_counter=llm, # 用模型的 tokenizer 精确计算 Token 数 include_system=True, # 保留 SystemMessage(即使超限也不裁剪) allow_partial=False, # 不允许裁断单条消息(保证消息完整性) start_on="human", # 确保保留的第一条消息是用户消息(保证对话完整性) ) # 手动测试裁剪效果 messages = [ SystemMessage(content="你是一个助手"), HumanMessage(content="第一轮用户消息"), AIMessage(content="第一轮助手回复"), HumanMessage(content="第二轮用户消息"), AIMessage(content="第二轮助手回复"), HumanMessage(content="第三轮用户消息"), AIMessage(content="第三轮助手回复"), HumanMessage(content="当前问题"), ] trimmed = trimmer.invoke(messages) print(f"原始消息数:{len(messages)}") print(f"裁剪后消息数:{len(trimmed)}") # 根据 Token 数,只保留最近几轮和 SystemMessage
把裁剪逻辑嵌入 LCEL 链,让裁剪在每次请求时自动发生:
from langchain_core.messages import trim_messages from langchain_core.runnables import RunnablePassthrough, RunnableLambda from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.output_parsers import StrOutputParser from langchain_core.chat_history import InMemoryChatMessageHistory from langchain_core.runnables.history import RunnableWithMessageHistory from langchain_openai import ChatOpenAI from operator import itemgetter llm = ChatOpenAI(model="gpt-4o") # 定义裁剪器 trimmer = trim_messages( max_tokens=4000, strategy="last", token_counter=llm, include_system=True, allow_partial=False, start_on="human", ) # 构建带裁剪的链 prompt = ChatPromptTemplate.from_messages([ ("system", "你是一个有帮助的助手。"), MessagesPlaceholder(variable_name="history"), ("human", "{input}"), ]) # 在链中加入裁剪步骤:先裁剪历史,再注入 prompt chain = ( RunnablePassthrough.assign( # 从输入中取出 history 字段,经过裁剪后再放回去 history=itemgetter("history") | trimmer, ) | prompt | llm | StrOutputParser() ) # 包装成带历史的链 session_store = {} def get_session_history(session_id: str) -> InMemoryChatMessageHistory: if session_id not in session_store: session_store[session_id] = InMemoryChatMessageHistory() return session_store[session_id] chain_with_history = RunnableWithMessageHistory( chain, get_session_history, input_messages_key="input", history_messages_key="history", ) # 使用:和之前完全一样,裁剪在幕后自动进行 config = {"configurable": {"session_id": "user_001"}} for i in range(20): # 模拟长对话 reply = chain_with_history.invoke({"input": f"这是第 {i+1} 轮对话"}, config=config) print(f"第 {i+1} 轮回复:{reply[:50]}")
除了按 Token 裁剪,有时只需要按消息数量或类型过滤:
from langchain_core.messages import filter_messages, HumanMessage, AIMessage, SystemMessage messages = [ SystemMessage(content="你是助手", id="s1"), HumanMessage(content="问题1", id="h1"), AIMessage(content="回答1", id="a1"), HumanMessage(content="问题2", id="h2"), AIMessage(content="回答2", id="a2"), HumanMessage(content="问题3", id="h3"), AIMessage(content="回答3", id="a3"), ] # 只保留 HumanMessage human_only = filter_messages(messages, include_types=["human"]) print([m.content for m in human_only]) # ['问题1', '问题2', '问题3'] # 排除 SystemMessage no_system = filter_messages(messages, exclude_types=["system"]) print(len(no_system)) # 6 # 只保留特定 ID 的消息 specific = filter_messages(messages, include_ids=["h2", "a2", "h3"]) print([m.content for m in specific]) # ['问题2', '回答2', '问题3']
除了裁剪,另一种策略是把早期的对话历史"压缩"成一段摘要,保留语义信息的同时大幅减少 Token 消耗:
from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage, AIMessage, SystemMessage from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from typing import List from langchain_core.messages import BaseMessage llm = ChatOpenAI(model="gpt-4o") # 摘要生成链 summary_prompt = ChatPromptTemplate.from_messages([ ("system", "你是一个对话摘要助手。请将以下对话历史压缩成一段简洁的摘要,保留关键信息。"), ("human", "对话历史:\n\n{conversation}\n\n请生成摘要:"), ]) summary_chain = summary_prompt | llm | StrOutputParser() class SummaryMemoryManager: """ 总结摘要记忆管理器 当历史超过阈值时,把早期历史压缩成摘要 """ def __init__(self, max_turns: int = 10, summary_turns: int = 6): """ max_turns: 最大保留轮数(超过后触发摘要) summary_turns: 每次摘要压缩的轮数 """ self.max_turns = max_turns self.summary_turns = summary_turns self.messages: List[BaseMessage] = [] self.summary: str = "" def add_exchange(self, human_msg: str, ai_msg: str): """添加一轮对话""" self.messages.append(HumanMessage(content=human_msg)) self.messages.append(AIMessage(content=ai_msg)) # 检查是否超过阈值 current_turns = len(self.messages) // 2 if current_turns >= self.max_turns: self._compress_history() def _compress_history(self): """把早期历史压缩成摘要""" # 取出要压缩的消息 compress_count = self.summary_turns * 2 to_compress = self.messages[:compress_count] self.messages = self.messages[compress_count:] # 保留剩余的 # 生成摘要 conversation_text = "\n".join([ f"{'用户' if isinstance(m, HumanMessage) else '助手'}:{m.content}" for m in to_compress ]) new_summary = summary_chain.invoke({"conversation": conversation_text}) # 合并摘要(如果已有摘要,合并两段) if self.summary: merge_input = f"原有摘要:{self.summary}\n\n新增内容:{new_summary}" self.summary = summary_chain.invoke({"conversation": merge_input}) else: self.summary = new_summary print(f"[Memory] 已压缩 {self.summary_turns} 轮历史,当前摘要:{self.summary[:60]}...") def get_context_messages(self) -> List[BaseMessage]: """获取用于注入 prompt 的上下文消息""" context = [] if self.summary: # 把摘要作为 SystemMessage 的一部分 context.append(SystemMessage(content=f"以下是之前对话的摘要:\n{self.summary}")) context.extend(self.messages) return context # 使用摘要记忆管理器 manager = SummaryMemoryManager(max_turns=8, summary_turns=4) # 模拟长对话 conversations = [ ("我是张伟,Python 工程师", "你好张伟!"), ("我在开发一个 RAG 系统", "RAG 是个很好的技术选择!"), ("我用的向量数据库是 Chroma", "Chroma 是个不错的选择"), ("遇到了检索精度问题", "可以尝试调整 chunk 大小"), ("chunk 大小设为多少比较好?", "通常 256-512 tokens 是个好的起点"), ("我试了 512,效果还不错", "很高兴听到有改善"), ("还有什么可以优化的?", "可以考虑使用重排序"), ("重排序怎么实现?", "可以用 Cohere 或 BGE Reranker"), ("BGE Reranker 怎么集成?", "可以通过 HuggingFace 加载..."), ] for human, ai in conversations: manager.add_exchange(human, ai) # 查看当前状态 context = manager.get_context_messages() print(f"\n当前上下文消息数:{len(context)}") if manager.summary: print(f"历史摘要:{manager.summary[:100]}...")
生产环境中,一个服务要同时处理来自不同用户的请求,Session 管理必须做到严格隔离。
from fastapi import FastAPI, Depends, Header from fastapi.responses import StreamingResponse from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.output_parsers import StrOutputParser from langchain_community.chat_message_histories import RedisChatMessageHistory from langchain_core.runnables.history import RunnableWithMessageHistory from typing import Optional import uuid app = FastAPI() # 初始化链 llm = ChatOpenAI(model="gpt-4o") prompt = ChatPromptTemplate.from_messages([ ("system", "你是一个智能助手。"), MessagesPlaceholder(variable_name="history"), ("human", "{input}"), ]) chain = prompt | llm | StrOutputParser() def get_redis_history(session_id: str) -> RedisChatMessageHistory: return RedisChatMessageHistory( session_id=session_id, url="redis://localhost:6379/0", ttl=7 * 24 * 3600, # 7 天过期 ) chain_with_history = RunnableWithMessageHistory( chain, get_redis_history, input_messages_key="input", history_messages_key="history", ) # --------- 关键:session_id 的构成策略 --------- # 策略一:user_id(跨设备同一用户的历史一致) # 策略二:user_id + conversation_id(同一用户的不同对话独立) # 策略三:device_id + timestamp(设备级临时会话) def build_session_id(user_id: str, conversation_id: Optional[str] = None) -> str: """构建 session_id""" if conversation_id: return f"user:{user_id}:conv:{conversation_id}" return f"user:{user_id}:default" @app.post("/chat") async def chat( message: str, user_id: str = Header(...), # 从请求头获取用户 ID conversation_id: Optional[str] = Header(None), # 可选的对话 ID ): session_id = build_session_id(user_id, conversation_id) config = {"configurable": {"session_id": session_id}} result = await chain_with_history.ainvoke( {"input": message}, config=config, ) return {"reply": result, "session_id": session_id} @app.post("/chat/stream") async def chat_stream( message: str, user_id: str = Header(...), conversation_id: Optional[str] = Header(None), ): session_id = build_session_id(user_id, conversation_id) config = {"configurable": {"session_id": session_id}} async def generate(): async for chunk in chain_with_history.astream( {"input": message}, config=config, ): yield f"data: {chunk}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(generate(), media_type="text/event-stream") @app.delete("/chat/history/{user_id}") async def clear_history(user_id: str, conversation_id: Optional[str] = None): """清空指定用户的对话历史""" session_id = build_session_id(user_id, conversation_id) history = get_redis_history(session_id) history.clear() return {"message": f"已清空 session {session_id} 的历史"} @app.get("/chat/history/{user_id}") async def get_history(user_id: str, conversation_id: Optional[str] = None): """获取指定用户的对话历史""" session_id = build_session_id(user_id, conversation_id) history = get_redis_history(session_id) return { "session_id": session_id, "message_count": len(history.messages), "messages": [ {"role": msg.__class__.__name__, "content": msg.content} for msg in history.messages ], }
允许用户同时维护多个独立的对话(类似 ChatGPT 的侧边栏对话列表):
from datetime import datetime from typing import List, Dict import uuid class ConversationManager: """ 多对话管理器:支持创建、切换、删除对话 """ def __init__(self, user_id: str, chain_with_history, get_history_fn): self.user_id = user_id self.chain = chain_with_history self.get_history = get_history_fn self.conversations: Dict[str, dict] = {} # conv_id -> metadata def new_conversation(self, title: str = "") -> str: """创建新对话,返回 conversation_id""" conv_id = str(uuid.uuid4())[:8] self.conversations[conv_id] = { "id": conv_id, "title": title or f"对话 {len(self.conversations) + 1}", "created_at": datetime.now().isoformat(), "session_id": f"user:{self.user_id}:conv:{conv_id}", } return conv_id def chat(self, conv_id: str, message: str) -> str: """在指定对话中发送消息""" if conv_id not in self.conversations: raise ValueError(f"对话 {conv_id} 不存在") session_id = self.conversations[conv_id]["session_id"] config = {"configurable": {"session_id": session_id}} reply = self.chain.invoke({"input": message}, config=config) # 如果是第一条消息,用消息内容生成对话标题 history = self.get_history(session_id) if len(history.messages) == 2: # 刚保存了第一轮(human + ai) self.conversations[conv_id]["title"] = message[:20] + ("..." if len(message) > 20 else "") return reply def delete_conversation(self, conv_id: str): """删除对话及其历史""" if conv_id in self.conversations: session_id = self.conversations[conv_id]["session_id"] self.get_history(session_id).clear() del self.conversations[conv_id] def list_conversations(self) -> List[dict]: """列出所有对话""" return sorted( self.conversations.values(), key=lambda x: x["created_at"], reverse=True, ) def get_conversation_history(self, conv_id: str) -> list: """获取指定对话的历史""" session_id = self.conversations[conv_id]["session_id"] history = self.get_history(session_id) return [{"role": type(m).__name__, "content": m.content} for m in history.messages] # 使用示例 manager = ConversationManager("user_001", chain_with_history, get_redis_history) # 创建两个独立对话 conv1 = manager.new_conversation("技术讨论") conv2 = manager.new_conversation("日常聊天") # 在各自对话中交流 manager.chat(conv1, "我在学习 LangChain,有什么建议?") manager.chat(conv2, "今天天气真好") manager.chat(conv1, "LCEL 的核心是什么?") # 列出所有对话 for conv in manager.list_conversations(): print(f"[{conv['id']}] {conv['title']} - {conv['created_at']}") # 查看某个对话的历史 history = manager.get_conversation_history(conv1) for msg in history: print(f" {msg['role']}: {msg['content'][:50]}")
记忆模块和系统提示词结合,可以构建出"了解用户"的个性化助手:
from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.output_parsers import StrOutputParser from langchain_core.chat_history import InMemoryChatMessageHistory from langchain_core.runnables.history import RunnableWithMessageHistory from langchain_core.runnables import RunnablePassthrough, RunnableLambda from langchain_core.messages import HumanMessage, AIMessage from typing import List import json llm = ChatOpenAI(model="gpt-4o") # 动态提取用户信息的辅助链 extract_user_info_prompt = ChatPromptTemplate.from_messages([ ("system", """从对话历史中提取用户的个人信息。 只输出 JSON,格式:{{"name": null, "occupation": null, "interests": [], "preferences": []}} 如果信息不存在则保留 null 或空列表。不要编造信息。"""), ("human", "对话历史:\n{history_text}\n\n请提取用户信息:"), ]) from langchain_core.output_parsers import JsonOutputParser extract_chain = extract_user_info_prompt | llm | JsonOutputParser() def extract_user_profile(messages: List) -> dict: """从历史消息中提取用户档案""" if not messages: return {} history_text = "\n".join([ f"{'用户' if isinstance(m, HumanMessage) else '助手'}:{m.content}" for m in messages[-10:] # 只看最近 10 条 ]) try: return extract_chain.invoke({"history_text": history_text}) except Exception: return {} # 个性化助手:System Prompt 会根据已知的用户信息动态调整 def build_personalized_chain(user_profile: dict = None): """根据用户档案构建个性化链""" profile_text = "" if user_profile: parts = [] if user_profile.get("name"): parts.append(f"用户姓名:{user_profile['name']}") if user_profile.get("occupation"): parts.append(f"职业:{user_profile['occupation']}") if user_profile.get("interests"): parts.append(f"兴趣爱好:{', '.join(user_profile['interests'])}") if parts: profile_text = "\n\n已知用户信息:\n" + "\n".join(parts) system_prompt = f"""你是一个智能个人助手,能记住并利用用户的信息提供个性化服务。{profile_text} 请根据已知信息,提供更有针对性的回答。如果用户透露了新信息,记住并自然地融入后续回答。""" prompt = ChatPromptTemplate.from_messages([ ("system", system_prompt), MessagesPlaceholder(variable_name="history"), ("human", "{input}"), ]) return prompt | llm | StrOutputParser() class PersonalizedChatbot: """带用户画像的个性化对话机器人""" def __init__(self): self.session_store = {} self.user_profiles = {} # 缓存用户档案 def get_history(self, session_id: str) -> InMemoryChatMessageHistory: if session_id not in self.session_store: self.session_store[session_id] = InMemoryChatMessageHistory() return self.session_store[session_id] def chat(self, user_id: str, message: str) -> str: # 获取当前历史 history = self.get_history(user_id) # 每 5 轮更新一次用户档案 current_turns = len(history.messages) // 2 if current_turns % 5 == 0 and current_turns > 0: self.user_profiles[user_id] = extract_user_profile(history.messages) print(f"[Profile] 更新用户档案:{self.user_profiles.get(user_id)}") # 根据用户档案构建个性化链 user_profile = self.user_profiles.get(user_id, {}) chain = build_personalized_chain(user_profile) chain_with_history = RunnableWithMessageHistory( chain, self.get_history, input_messages_key="input", history_messages_key="history", ) config = {"configurable": {"session_id": user_id}} return chain_with_history.invoke({"input": message}, config=config) # 使用 bot = PersonalizedChatbot() print(bot.chat("u001", "你好!我叫李华,是一名数据科学家")) print(bot.chat("u001", "我最近在学习 LangChain")) print(bot.chat("u001", "我喜欢下棋和读书")) print(bot.chat("u001", "有什么适合我的 AI 项目推荐?")) # 助手会利用已知的用户信息(数据科学家、学 LangChain、喜欢下棋读书)给出个性化推荐
如果你接手的是 LangChain 0.x 的老项目,代码里可能大量使用了已被标记为 Legacy 的 Memory 类。本节帮助你理解它们的等价替代方案。
这是最常见的迁移场景:
# ❌ 旧写法(Legacy,不推荐) from langchain.memory import ConversationBufferMemory from langchain.chains import ConversationChain from langchain_openai import ChatOpenAI llm = ChatOpenAI(model="gpt-4o") memory = ConversationBufferMemory() conversation = ConversationChain(llm=llm, memory=memory) result = conversation.predict(input="你好,我叫张伟") result = conversation.predict(input="我叫什么名字?") # ✅ 新写法(LangChain 1.0) from langchain_core.chat_history import InMemoryChatMessageHistory from langchain_core.runnables.history import RunnableWithMessageHistory from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.output_parsers import StrOutputParser session_store = {} def get_history(session_id: str) -> InMemoryChatMessageHistory: if session_id not in session_store: session_store[session_id] = InMemoryChatMessageHistory() return session_store[session_id] prompt = ChatPromptTemplate.from_messages([ ("system", "你是一个助手"), MessagesPlaceholder("history"), ("human", "{input}"), ]) chain = prompt | ChatOpenAI(model="gpt-4o") | StrOutputParser() chain_with_history = RunnableWithMessageHistory( chain, get_history, input_messages_key="input", history_messages_key="history", ) config = {"configurable": {"session_id": "default"}} chain_with_history.invoke({"input": "你好,我叫张伟"}, config=config) chain_with_history.invoke({"input": "我叫什么名字?"}, config=config)
旧版 ConversationBufferWindowMemory(k=5) 保留最近 5 轮对话,新版用 trim_messages 替代:
ConversationBufferWindowMemory(k=5)
# ❌ 旧写法 from langchain.memory import ConversationBufferWindowMemory memory = ConversationBufferWindowMemory(k=5) # 只保留最近 5 轮 # ✅ 新写法:用 trim_messages 精确控制 from langchain_core.messages import trim_messages # 按轮数裁剪(每轮 = 1 HumanMessage + 1 AIMessage = 2 条消息) def keep_last_n_turns(messages, n: int = 5): """保留最近 n 轮对话""" # 保留 SystemMessage + 最近 n*2 条消息 system_msgs = [m for m in messages if m.__class__.__name__ == "SystemMessage"] non_system = [m for m in messages if m.__class__.__name__ != "SystemMessage"] return system_msgs + non_system[-(n * 2):] # 或者用 trim_messages 按 Token 数裁剪(更推荐) trimmer = trim_messages( max_tokens=2000, strategy="last", token_counter=ChatOpenAI(model="gpt-4o"), include_system=True, )
# ❌ 旧写法 from langchain.memory import ConversationSummaryMemory from langchain_openai import ChatOpenAI memory = ConversationSummaryMemory(llm=ChatOpenAI()) # ✅ 新写法:参见第四节"总结摘要策略"的 SummaryMemoryManager 实现 # 核心思路不变:超过阈值时用 LLM 生成摘要,摘要作为 SystemMessage 注入
ConversationBufferWindowMemory(k=N)
ConversationSummaryBufferMemory
langchain_community
ConversationChain
在开发阶段,想知道每次请求实际发给模型的完整消息是什么:
from langchain_core.runnables import RunnableLambda from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_openai import ChatOpenAI from langchain_core.output_parsers import StrOutputParser from langchain_core.chat_history import InMemoryChatMessageHistory from langchain_core.runnables.history import RunnableWithMessageHistory def debug_messages(input_dict: dict) -> dict: """调试用:打印注入历史后的完整消息""" if "history" in input_dict: print(f"\n[DEBUG] 本次请求消息总数:{len(input_dict['history']) + 2}") # +2 是 system + human print(f"[DEBUG] 历史消息数:{len(input_dict['history'])}") for i, msg in enumerate(input_dict['history']): role = type(msg).__name__ preview = msg.content[:50] + "..." if len(msg.content) > 50 else msg.content print(f" [{i}] {role}: {preview}") return input_dict prompt = ChatPromptTemplate.from_messages([ ("system", "你是助手"), MessagesPlaceholder("history"), ("human", "{input}"), ]) chain = ( RunnableLambda(debug_messages) # 在 prompt 之前插入调试步骤 | prompt | ChatOpenAI(model="gpt-4o") | StrOutputParser() ) session_store = {} def get_history(sid): if sid not in session_store: session_store[sid] = InMemoryChatMessageHistory() return session_store[sid] chain_with_history = RunnableWithMessageHistory( chain, get_history, input_messages_key="input", history_messages_key="history", ) config = {"configurable": {"session_id": "debug_session"}} chain_with_history.invoke({"input": "第一轮"}, config=config) chain_with_history.invoke({"input": "第二轮"}, config=config) # [DEBUG] 本次请求消息总数:2 # [DEBUG] 历史消息数:0 # [DEBUG] 本次请求消息总数:4 # [DEBUG] 历史消息数:2 # [0] HumanMessage: 第一轮 # [1] AIMessage: ...
在某些场景下,需要手动干预历史记录(比如删除某条不合适的回复):
from langchain_core.messages import HumanMessage, AIMessage history = session_store.get("user_001") if history: print(f"当前历史:{len(history.messages)} 条消息") # 删除最后一轮(撤回上一次对话) if len(history.messages) >= 2: history.messages = history.messages[:-2] print("已删除最后一轮对话") # 手动注入一条背景信息 history.add_message(HumanMessage(content="[系统注入] 用户是 VIP 会员")) history.add_message(AIMessage(content="[系统注入] 好的,已记录用户为 VIP 会员"))
长对话会导致每次请求的 Token 消耗快速增长,需要监控:
from langchain_core.callbacks import UsageMetadataCallbackHandler from langchain_core.runnables.history import RunnableWithMessageHistory callback = UsageMetadataCallbackHandler() # 在每次 invoke 时传入 callback result = chain_with_history.invoke( {"input": "你好"}, config={ "configurable": {"session_id": "user_001"}, "callbacks": [callback], } ) print(f"本次消耗 Token:{callback.usage_metadata}") # {'input_tokens': 150, 'output_tokens': 50, 'total_tokens': 200} # 注意:随着历史增长,input_tokens 会线性增加,这就是为什么要做裁剪
这是初学者最常遇到的报错:
# ❌ 错误:variable_name 和 history_messages_key 名称不一致 prompt = ChatPromptTemplate.from_messages([ ("system", "你是助手"), MessagesPlaceholder(variable_name="chat_history"), # 注意:这里是 chat_history ("human", "{input}"), ]) chain_with_history = RunnableWithMessageHistory( chain, get_session_history, input_messages_key="input", history_messages_key="history", # ❌ 这里是 history,和上面的 chat_history 不一致! ) # ✅ 正确:两者名称必须完全一致 prompt = ChatPromptTemplate.from_messages([ ("system", "你是助手"), MessagesPlaceholder(variable_name="history"), # ✅ history ("human", "{input}"), ]) chain_with_history = RunnableWithMessageHistory( chain, get_session_history, input_messages_key="input", history_messages_key="history", # ✅ history,和上面一致 )
在多租户系统中,不同用户的 session_id 一旦冲突会导致严重的安全问题:
# ❌ 危险:只用数字 ID,容易碰撞 session_id = str(user_id) # "123" # ✅ 安全:加前缀和命名空间 session_id = f"app:v1:user:{user_id}:conv:{conv_id}" # 例如:"app:v1:user:12345:conv:abc123"
from langchain_core.messages import HumanMessage, AIMessage history = InMemoryChatMessageHistory() # add_message:一次添加一条消息 history.add_message(HumanMessage(content="你好")) history.add_message(AIMessage(content="你好!")) # add_messages:一次添加多条消息(批量) history.add_messages([ HumanMessage(content="问题"), AIMessage(content="回答"), ]) # 快捷方法 history.add_user_message("用户消息") # 等价于 add_message(HumanMessage(...)) history.add_ai_message("助手消息") # 等价于 add_message(AIMessage(...))
get_session_history 会在每次调用链时执行,里面的逻辑要尽量轻量:
get_session_history
# ❌ 反例:每次调用都建立数据库连接(耗时且浪费资源) def get_session_history(session_id: str): connection = create_db_connection() # 每次调用都建新连接! return CustomHistory(session_id, connection) # ✅ 正例:连接复用(连接池或单例) from functools import lru_cache db_pool = create_connection_pool() # 模块级别初始化 def get_session_history(session_id: str): return CustomHistory(session_id, db_pool) # 复用连接池
当 Memory 和 Agent 结合使用时,input_messages_key 和 output_messages_key 的配置会更复杂:
input_messages_key
output_messages_key
from langchain_core.runnables.history import RunnableWithMessageHistory # Agent 的输入/输出结构与普通链不同 # Agent 输入通常是 {"input": "...", "chat_history": [...]} # Agent 输出通常是 {"output": "..."} agent_with_history = RunnableWithMessageHistory( agent_executor, get_session_history, input_messages_key="input", history_messages_key="chat_history", output_messages_key="output", # Agent 的输出字段名 )
include_system=True
你现在应该能做到: - 用 RunnableWithMessageHistory 把任意 LCEL 链升级为多轮对话链 - 根据部署规模选择合适的存储后端(内存/SQLite/Redis) - 用 trim_messages 控制历史长度,避免上下文溢出和成本失控 - 在多用户场景下正确设计 session_id 策略,保证用户隔离 - 结合摘要压缩策略处理超长对话场景 - 在 FastAPI 中实现生产级的多用户流式对话接口
《工具调用——Tool 的定义、注册与执行》
掌握了记忆之后,下一篇进入智能体的核心——让模型调用工具。你会学到:
@tool
StructuredTool
bind_tools()
代码仓库:本系列所有可运行代码示例统一维护在 GitHub,每篇对应独立目录,可直接克隆运行。 系列导航:[第一篇] → [第二篇] → [第三篇] → [第四篇] → [第五篇] → 第六篇(当前) → 第七篇 → ...
代码仓库:本系列所有可运行代码示例统一维护在 GitHub,每篇对应独立目录,可直接克隆运行。
系列导航:[第一篇] → [第二篇] → [第三篇] → [第四篇] → [第五篇] → 第六篇(当前) → 第七篇 → ...
还没有评论,来抢沙发吧!
博客管理员
40 篇文章
还没有评论,来抢沙发吧!