2.2. ChatMessageHistory 是构建自定义记忆管道的基座

ChatMessageHistory 是构建自定义记忆管道的基座

当你为 Agent 接入记忆、想为每次对话留下完整审计轨迹,或需要为上百个租户提供完全隔离的会话上下文时,只靠 RunnableWithMessageHistory 那层“自动存取”已经不够用了。你需要直接面对 LangChain 记忆体系中最底层、最灵活的那块积木——ChatMessageHistory。本章就带你拆解它的内部工作流,再亲手实现一套可持久化、可审计的自定义记忆管道。


你需要什么

  • Python 3.9+ 与虚拟环境
  • langchainlangchain-communitylangchain-core(截至 2026 年 6 月,0.2.x 系列稳定)
  • Redis 服务(可用 Docker 快速启动:docker run -p 6379:6379 redis:7
  • redis Python 客户端:pip install redis
  • 预计时间:45 分钟(概念理解 15 分钟 + 编码与测试 30 分钟)

最终成果

你将得到一个可直接应用于生产环境的记忆管理方案:

  • 一个可继承 BaseChatMessageHistory 的自定义类,底层使用 Redis 存储,天然支持多租户 Session 隔离。
  • 一套回调审计模组,通过 LangChain 的 BaseCallbackHandler 自动记录每一次记忆读写,形成可供追溯的日志。
  • 一个完整的测试脚本,验证消息在不同 Session 之间互不干扰。

这一切都源于对 ChatMessageHistory 这一“记忆原子”的透彻掌控。


步骤一:用 ChatMessageHistory 还原消息流转

先别急着写定制代码,我们先直接使用 LangChain 内置的 ChatMessageHistory,看看消息是怎么被记录和消费的。

from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage

# 创建一个内存中的历史记录器
history = ChatMessageHistory()

# 模拟一轮对话:用户提问 → 模型回复 → 工具调用结果
history.add_user_message("帮我查一下今天的天气")   # 记录用户消息
history.add_ai_message("好的,我来调用天气 API")    # 记录 AI 思考/动作
history.add_message(ToolMessage(                      # 直接添加工具返回
    content="北京,晴,25°C",
    tool_call_id="weather_001"
))

# 查看完整消息列表
for msg in history.messages:
    print(f"{msg.type}: {msg.content[:20]}...")

预期结果:输出三行记录,类型分别为 humanaitool,这与 Agent 实际运行的消息流完全一致。

踩坑提醒
ChatMessageHistoryfinal 类——你不能继承它来增加持久化逻辑。它的使命是提供一个纯净的内存实现,所有高级定制都应当继承它的抽象父类 BaseChatMessageHistory 来完成(下一步就会这么做)。

如果你把上面这段代码打包成一个 Chain 或 Agent,LangChain 在运行时正是通过 ChatMessageHistoryadd_user_message / add_ai_message 等方法逐步追加消息的。理解了这一步,你就抓住了所有记忆模块的消息流转中枢


步骤二:自定义 RedisChatMessageHistory 实现多租户隔离

生产环境不允许一重启就丢失所有对话,也不允许租户 A 看到租户 B 的聊天记录。因此我们需要一个既能持久化、又能按 Session 隔离的 BaseChatMessageHistory 实现。

关键设计点

  • 使用 session_id 作为 Redis Key 的前缀,例如 chat:history:{session_id}
  • 利用 langchain_core.messages 中的 message_to_dictmessages_from_dict 完成序列化与反序列化。
  • 消息列表以 JSON 数组形式存入 Redis String,每次 add_messages 后整体写回。

编写自定义类

import json
import redis
from typing import List, Optional
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage, message_to_dict, messages_from_dict

class RedisChatMessageHistory(BaseChatMessageHistory):
    """基于 Redis 的多租户聊天历史,每个 session_id 对应一个独立历史。"""

    def __init__(self, session_id: str, redis_client: redis.Redis):
        self.session_id = session_id
        self.redis = redis_client
        self._key = f"chat:history:{session_id}"

    @property
    def messages(self) -> List[BaseMessage]:
        # 从 Redis 读取序列化数据,转换为消息对象列表
        raw = self.redis.get(self._key)
        if raw is None:
            return []
        items = json.loads(raw)
        # 注意:这里务必使用 messages_from_dict,而非自己解析
        return messages_from_dict(items)

    def add_messages(self, messages: List[BaseMessage]) -> None:
        # 获取当前历史,追加新消息,再写回 Redis
        current = self.messages
        current.extend(messages)
        serialized = [message_to_dict(msg) for msg in current]
        self.redis.set(self._key, json.dumps(serialized, ensure_ascii=False))

    def clear(self) -> None:
        self.redis.delete(self._key)

代码注释要点

  • message_to_dict 返回的字典会包含 typecontent 以及可能的 tool_calls 等字段,LangChain 的反序列化工具就是靠 type 来还原正确的消息子类。
  • 我们用 messages_from_dict 而不是手动构造对象,这样即使未来增加了新消息类型,代码也依然兼容。

测试多租户隔离

client = redis.Redis(host='localhost', port=6379, decode_responses=True)

# 租户 A 的会话
history_a = RedisChatMessageHistory(session_id="user-101", redis_client=client)
history_a.add_messages([HumanMessage(content="我的订单什么时候到?")])
history_a.add_messages([AIMessage(content="您的订单预计明天送达")])

# 租户 B 的会话
history_b = RedisChatMessageHistory(session_id="user-202", redis_client=client)
history_b.add_messages([HumanMessage(content="帮我推荐一款手机")])
history_b.add_messages([AIMessage(content="您的预算是多少?")])

# 重新加载 A 的历史,确认没有被 B 污染
loaded_a = RedisChatMessageHistory(session_id="user-101", redis_client=client)
print(f"A 的历史数量: {len(loaded_a.messages)}")   # 应为 2
print(f"A 的第一条消息: {loaded_a.messages[0].content}")

预期结果:输出 A 的历史数量: 2,且内容正确,证明 Session 隔离生效。

踩坑清单

  • 序列化遗漏 type:如果手动提取 {"role": ..., "content": ...} 而不保留完整的字典结构,messages_from_dict 将无法重建消息对象,导致类型丢失。
  • Redis 键冲突:切勿在多个不相关的应用中复用相同 Key 前缀,建议加入业务标识,例如 myapp:chat:history:{id}
  • 高并发写覆盖:简单 getappendset 存在竞态,生产环境可改用 Redis List + RPUSH / LRANGE 原子操作,或用分布式锁保护。

步骤三:用 CallbackHandler 实现记忆操作审计

透明化的上下文审计要求我们能够记录每一次记忆的写入,包括谁在何时触发了什么消息。LangChain 的回调系统恰好可以在这个层面埋点。

我们将创建一个自定义的 BaseCallbackHandler,在 Agent 或 Chain 开始调用模型前(此时输入消息已经被读取并传入),以及调用结束后(此时输出消息已生成并即将被写入记忆),采集关键信息并写入审计日志。

import logging
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import BaseMessage

# 配置一个简单的文件日志
logging.basicConfig(filename='memory_audit.log', level=logging.INFO)

class MemoryAuditCallback(BaseCallbackHandler):
    """审计回调:记录模型输入(读记忆)和输出(即将写入记忆的消息)"""

    def on_chat_model_start(
        self,
        serialized: dict,
        messages: list[list[BaseMessage]],
        **kwargs,
    ) -> None:
        # messages 是一个二维列表,通常第一维代表多个样例,我们取第一个
        for msg_list in messages:
            for msg in msg_list:
                logging.info(f"[READ] {msg.type}: {msg.content[:50]}...")

    def on_llm_end(self, response, **kwargs) -> None:
        # response 中包含生成的消息,这些消息随后会被记忆组件写入
        for gen in response.generations:
            for choice in gen:
                text = choice.text if hasattr(choice, 'text') else str(choice.message.content)
                logging.info(f"[WRITE] ai: {text[:50]}...")

将回调接入 Chain

from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 构造一条简单的 Chain(仅作演示,实际 Agent 同理)
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个助手"),
    ("human", "{input}")
])
# 假设 chat 是你已经实例化的聊天模型
chain = prompt | chat | StrOutputParser()

# 配置回调
config = {"callbacks": [MemoryAuditCallback()]}

# 执行一次对话
result = chain.invoke({"input": "今天股市怎么样?"}, config)

预期结果:运行后,memory_audit.log 文件中会先后出现类似如下的记录:

[READ] human: 今天股市怎么样?...
[WRITE] ai: 对不起,我无法提供实时股市数据...

这样一来,每次记忆被消费和产生的内容都有迹可循,为后续的合规审查、调试与用户投诉处理提供了坚实依据。


回顾

在本章中,你依次完成了三件事:

  1. ChatMessageHistory 的原始用法入手,摸清了 Agent 内部 HumanMessageAIMessageToolMessage 的流转规律。
  2. 绕过 ChatMessageHistory 不可继承的限制,通过继承 BaseChatMessageHistory 打造了基于 Redis 的多租户记忆支架,并验证了 Session 隔离性。
  3. 利用 BaseCallbackHandler 插入了审计钩子,实现记忆读写行为的全量日志记录。

从初识“内存原子”到生产级记忆管道,整体耗时约 45 分钟


行动清单

  • [ ] 在你的项目中用 RedisChatMessageHistory 替换掉默认的内存历史,完成一次重启后历史不丢失的检验。
  • [ ] 根据实际并发量评估 Redis 的原子写入方案(RPUSH / LRANGE),必要时加入分布式锁。
  • [ ] 将 MemoryAuditCallback 集成到你的主要 Agent 路由中,确保所有记忆操作都可追溯。
  • [ ] 为审计日志增加 user_idsession_id 等业务字段,方便与运维系统打通。

掌握了 LangChain 的底层记忆接口,你就拥有了“完全定制上下文治理”的能力。不过,这只是记忆管理的其中一条技术路线。下一章我们将转向 LlamaIndex 的记忆管理,看看它在知识密集型智能体中提供了哪些 LangChain 所不擅长的能力——比如内置的聊天引擎记忆与索引结构的深度耦合。这一对比将帮助你在不同场景中做出最佳的框架选型。

本文章首发在 LearnKu.com 网站上。

上一篇 下一篇
讨论数量: 0
发起讨论 查看所有版本


暂无话题~