2.2. ChatMessageHistory 是构建自定义记忆管道的基座
ChatMessageHistory 是构建自定义记忆管道的基座
当你为 Agent 接入记忆、想为每次对话留下完整审计轨迹,或需要为上百个租户提供完全隔离的会话上下文时,只靠 RunnableWithMessageHistory 那层“自动存取”已经不够用了。你需要直接面对 LangChain 记忆体系中最底层、最灵活的那块积木——ChatMessageHistory。本章就带你拆解它的内部工作流,再亲手实现一套可持久化、可审计的自定义记忆管道。
你需要什么
- Python 3.9+ 与虚拟环境
langchain、langchain-community、langchain-core(截至 2026 年 6 月,0.2.x 系列稳定)- Redis 服务(可用 Docker 快速启动:
docker run -p 6379:6379 redis:7) redisPython 客户端:pip install redis- 预计时间:45 分钟(概念理解 15 分钟 + 编码与测试 30 分钟)
最终成果
你将得到一个可直接应用于生产环境的记忆管理方案:
- 一个可继承
BaseChatMessageHistory的自定义类,底层使用 Redis 存储,天然支持多租户 Session 隔离。 - 一套回调审计模组,通过 LangChain 的
BaseCallbackHandler自动记录每一次记忆读写,形成可供追溯的日志。 - 一个完整的测试脚本,验证消息在不同 Session 之间互不干扰。
这一切都源于对 ChatMessageHistory 这一“记忆原子”的透彻掌控。
步骤一:用 ChatMessageHistory 还原消息流转
先别急着写定制代码,我们先直接使用 LangChain 内置的 ChatMessageHistory,看看消息是怎么被记录和消费的。
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
# 创建一个内存中的历史记录器
history = ChatMessageHistory()
# 模拟一轮对话:用户提问 → 模型回复 → 工具调用结果
history.add_user_message("帮我查一下今天的天气") # 记录用户消息
history.add_ai_message("好的,我来调用天气 API") # 记录 AI 思考/动作
history.add_message(ToolMessage( # 直接添加工具返回
content="北京,晴,25°C",
tool_call_id="weather_001"
))
# 查看完整消息列表
for msg in history.messages:
print(f"{msg.type}: {msg.content[:20]}...")
预期结果:输出三行记录,类型分别为 human、ai、tool,这与 Agent 实际运行的消息流完全一致。
踩坑提醒
ChatMessageHistory是final类——你不能继承它来增加持久化逻辑。它的使命是提供一个纯净的内存实现,所有高级定制都应当继承它的抽象父类BaseChatMessageHistory来完成(下一步就会这么做)。
如果你把上面这段代码打包成一个 Chain 或 Agent,LangChain 在运行时正是通过 ChatMessageHistory 的 add_user_message / add_ai_message 等方法逐步追加消息的。理解了这一步,你就抓住了所有记忆模块的消息流转中枢。
步骤二:自定义 RedisChatMessageHistory 实现多租户隔离
生产环境不允许一重启就丢失所有对话,也不允许租户 A 看到租户 B 的聊天记录。因此我们需要一个既能持久化、又能按 Session 隔离的 BaseChatMessageHistory 实现。
关键设计点
- 使用
session_id作为 Redis Key 的前缀,例如chat:history:{session_id}。 - 利用
langchain_core.messages中的message_to_dict和messages_from_dict完成序列化与反序列化。 - 消息列表以 JSON 数组形式存入 Redis String,每次
add_messages后整体写回。
编写自定义类
import json
import redis
from typing import List, Optional
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage, message_to_dict, messages_from_dict
class RedisChatMessageHistory(BaseChatMessageHistory):
"""基于 Redis 的多租户聊天历史,每个 session_id 对应一个独立历史。"""
def __init__(self, session_id: str, redis_client: redis.Redis):
self.session_id = session_id
self.redis = redis_client
self._key = f"chat:history:{session_id}"
@property
def messages(self) -> List[BaseMessage]:
# 从 Redis 读取序列化数据,转换为消息对象列表
raw = self.redis.get(self._key)
if raw is None:
return []
items = json.loads(raw)
# 注意:这里务必使用 messages_from_dict,而非自己解析
return messages_from_dict(items)
def add_messages(self, messages: List[BaseMessage]) -> None:
# 获取当前历史,追加新消息,再写回 Redis
current = self.messages
current.extend(messages)
serialized = [message_to_dict(msg) for msg in current]
self.redis.set(self._key, json.dumps(serialized, ensure_ascii=False))
def clear(self) -> None:
self.redis.delete(self._key)
代码注释要点:
message_to_dict返回的字典会包含type、content以及可能的tool_calls等字段,LangChain 的反序列化工具就是靠type来还原正确的消息子类。- 我们用
messages_from_dict而不是手动构造对象,这样即使未来增加了新消息类型,代码也依然兼容。
测试多租户隔离
client = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 租户 A 的会话
history_a = RedisChatMessageHistory(session_id="user-101", redis_client=client)
history_a.add_messages([HumanMessage(content="我的订单什么时候到?")])
history_a.add_messages([AIMessage(content="您的订单预计明天送达")])
# 租户 B 的会话
history_b = RedisChatMessageHistory(session_id="user-202", redis_client=client)
history_b.add_messages([HumanMessage(content="帮我推荐一款手机")])
history_b.add_messages([AIMessage(content="您的预算是多少?")])
# 重新加载 A 的历史,确认没有被 B 污染
loaded_a = RedisChatMessageHistory(session_id="user-101", redis_client=client)
print(f"A 的历史数量: {len(loaded_a.messages)}") # 应为 2
print(f"A 的第一条消息: {loaded_a.messages[0].content}")
预期结果:输出 A 的历史数量: 2,且内容正确,证明 Session 隔离生效。
踩坑清单
- 序列化遗漏
type:如果手动提取{"role": ..., "content": ...}而不保留完整的字典结构,messages_from_dict将无法重建消息对象,导致类型丢失。- Redis 键冲突:切勿在多个不相关的应用中复用相同 Key 前缀,建议加入业务标识,例如
myapp:chat:history:{id}。- 高并发写覆盖:简单
get→append→set存在竞态,生产环境可改用 Redis List +RPUSH/LRANGE原子操作,或用分布式锁保护。
步骤三:用 CallbackHandler 实现记忆操作审计
透明化的上下文审计要求我们能够记录每一次记忆的写入,包括谁在何时触发了什么消息。LangChain 的回调系统恰好可以在这个层面埋点。
我们将创建一个自定义的 BaseCallbackHandler,在 Agent 或 Chain 开始调用模型前(此时输入消息已经被读取并传入),以及调用结束后(此时输出消息已生成并即将被写入记忆),采集关键信息并写入审计日志。
import logging
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import BaseMessage
# 配置一个简单的文件日志
logging.basicConfig(filename='memory_audit.log', level=logging.INFO)
class MemoryAuditCallback(BaseCallbackHandler):
"""审计回调:记录模型输入(读记忆)和输出(即将写入记忆的消息)"""
def on_chat_model_start(
self,
serialized: dict,
messages: list[list[BaseMessage]],
**kwargs,
) -> None:
# messages 是一个二维列表,通常第一维代表多个样例,我们取第一个
for msg_list in messages:
for msg in msg_list:
logging.info(f"[READ] {msg.type}: {msg.content[:50]}...")
def on_llm_end(self, response, **kwargs) -> None:
# response 中包含生成的消息,这些消息随后会被记忆组件写入
for gen in response.generations:
for choice in gen:
text = choice.text if hasattr(choice, 'text') else str(choice.message.content)
logging.info(f"[WRITE] ai: {text[:50]}...")
将回调接入 Chain
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 构造一条简单的 Chain(仅作演示,实际 Agent 同理)
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个助手"),
("human", "{input}")
])
# 假设 chat 是你已经实例化的聊天模型
chain = prompt | chat | StrOutputParser()
# 配置回调
config = {"callbacks": [MemoryAuditCallback()]}
# 执行一次对话
result = chain.invoke({"input": "今天股市怎么样?"}, config)
预期结果:运行后,memory_audit.log 文件中会先后出现类似如下的记录:
[READ] human: 今天股市怎么样?...
[WRITE] ai: 对不起,我无法提供实时股市数据...
这样一来,每次记忆被消费和产生的内容都有迹可循,为后续的合规审查、调试与用户投诉处理提供了坚实依据。
回顾
在本章中,你依次完成了三件事:
- 从
ChatMessageHistory的原始用法入手,摸清了 Agent 内部HumanMessage、AIMessage、ToolMessage的流转规律。 - 绕过
ChatMessageHistory不可继承的限制,通过继承BaseChatMessageHistory打造了基于 Redis 的多租户记忆支架,并验证了 Session 隔离性。 - 利用
BaseCallbackHandler插入了审计钩子,实现记忆读写行为的全量日志记录。
从初识“内存原子”到生产级记忆管道,整体耗时约 45 分钟。
行动清单
- [ ] 在你的项目中用
RedisChatMessageHistory替换掉默认的内存历史,完成一次重启后历史不丢失的检验。 - [ ] 根据实际并发量评估 Redis 的原子写入方案(
RPUSH/LRANGE),必要时加入分布式锁。 - [ ] 将
MemoryAuditCallback集成到你的主要 Agent 路由中,确保所有记忆操作都可追溯。 - [ ] 为审计日志增加
user_id、session_id等业务字段,方便与运维系统打通。
掌握了 LangChain 的底层记忆接口,你就拥有了“完全定制上下文治理”的能力。不过,这只是记忆管理的其中一条技术路线。下一章我们将转向 LlamaIndex 的记忆管理,看看它在知识密集型智能体中提供了哪些 LangChain 所不擅长的能力——比如内置的聊天引擎记忆与索引结构的深度耦合。这一对比将帮助你在不同场景中做出最佳的框架选型。
上下文治理:AI Agent 系统设计
关于 LearnKu