Skip to content

持久化

LangGraph 内置了一个持久化层,通过检查点器实现。当你使用检查点器编译图时,检查点器会在每个超级步骤后保存一次图的状态快照(checkpoint)。这些快照会被保存到一个 线程(thread)中,在图执行后可以访问该线程。由于 线程 允许在图执行后访问其状态,因此包括人机交互、内存管理、时间旅行以及容错等在内的多种强大功能都变得可能。有关如何添加和使用检查点器的完整示例,请参阅此操作指南

检查点

LangGraph API 自动处理检查点

使用 LangGraph API 时,你不需要手动实现或配置检查点器。API 会为你自动处理所有持久化基础设施。

线程

线程是分配给每个检查点的独特ID或线程标识符,这些检查点是由检查点生成器保存的。在使用带有检查点生成器的图时,您**必须**在配置的configurable部分中指定一个thread_id

{"configurable": {"thread_id": "1"}}

检查点

检查点是每个超级步骤保存的图状态快照,并由StateSnapshot对象表示,具有以下关键属性:

  • config: 与此检查点关联的配置。
  • metadata: 与此检查点关联的元数据。
  • values: 在此时间点的状态通道值。
  • next: 图中要执行的下一个节点名称元组。
  • tasks: 包含有关要执行的下一个任务信息的PregelTask对象元组。如果该步骤之前已尝试过,则会包含错误信息。如果图在某个节点内被中断动态地,任务将包含与中断相关的附加数据。

让我们看看当一个简单的图被调用时,保存了哪些检查点:

API Reference: StateGraph | START | END | InMemorySaver

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: str
    bar: Annotated[list[str], add]

def node_a(state: State):
    return {"foo": "a", "bar": ["a"]}

def node_b(state: State):
    return {"foo": "b", "bar": ["b"]}


workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)

checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": ""}, config)

运行图后,我们期望看到恰好4个检查点:

  • 空检查点,其中START为下一个要执行的节点
  • 检查点,用户输入为{'foo': '', 'bar': []},且node_a为下一个要执行的节点
  • 检查点,node_a的输出为{'foo': 'a', 'bar': ['a']},且node_b为下一个要执行的节点
  • 检查点,node_b的输出为{'foo': 'b', 'bar': ['a', 'b']},且没有下一个要执行的节点

注意,bar通道的值包含了来自两个节点的输出,因为我们为bar通道定义了一个聚合器。

获取状态

在与保存的图状态交互时,您**必须**指定一个线程标识符。通过调用graph.get_state(config)可以查看图的最新状态。这将返回一个StateSnapshot对象,对应于配置中提供的线程ID关联的最新检查点或提供检查点ID的线程关联的检查点。

# 获取最新的状态快照
config = {"configurable": {"thread_id": "1"}}
graph.get_state(config)

# 获取特定检查点ID的状态快照
config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
graph.get_state(config)

在我们的示例中,get_state的输出如下所示:

StateSnapshot(
    values={'foo': 'b', 'bar': ['a', 'b']},
    next=(),
    config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
    metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
    created_at='2024-08-29T19:19:38.821749+00:00',
    parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}}, tasks=()
)

获取状态历史记录

您可以通过调用graph.get_state_history(config)来获取给定线程的完整执行历史。这将返回一组与配置中提供的线程ID关联的StateSnapshot对象。重要的是,检查点按时间顺序排列,最新的检查点/StateSnapshot将是列表中的第一个。

config = {"configurable": {"thread_id": "1"}}
list(graph.get_state_history(config))

在我们的示例中,get_state_history的输出如下所示:

[
    StateSnapshot(
        values={'foo': 'b', 'bar': ['a', 'b']},
        next=(),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
        metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
        created_at='2024-08-29T19:19:38.821749+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        tasks=(),
    ),
    StateSnapshot(
        values={'foo': 'a', 'bar': ['a']}, next=('node_b',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        metadata={'source': 'loop', 'writes': {'node_a': {'foo': 'a', 'bar': ['a']}}, 'step': 1},
        created_at='2024-08-29T19:19:38.819946+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        tasks=(PregelTask(id='6fb7314f-f114-5413-a1f3-d37dfe98ff44', name='node_b', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'foo': '', 'bar': []},
        next=('node_a',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        metadata={'source': 'loop', 'writes': None, 'step': 0},
        created_at='2024-08-29T19:19:38.817813+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        tasks=(PregelTask(id='f1b14528-5ee5-579c-949b-23ef9bfbed58', name='node_a', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'bar': []},
        next=('__start__',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        metadata={'source': 'input', 'writes': {'foo': ''}, 'step': -1},
        created_at='2024-08-29T19:19:38.816205+00:00',
        parent_config=None,
        tasks=(PregelTask(id='6d27aa2e-d72b-5504-a36f-8620e54a76dd', name='__start__', error=None, interrupts=()),),
    )
]

状态

回放

还可以回放先前的图执行。如果我们使用thread_idcheckpoint_id调用图,则我们将重新播放对应于checkpoint_id之前的步骤,并仅执行检查点之后的步骤。

  • thread_id 是线程的ID。
  • checkpoint_id 是指代线程内特定检查点的标识符。

在调用图时,必须将这些参数作为配置的一部分传递:

config = {"configurable": {"thread_id": "1", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}
graph.invoke(None, config=config)

重要的是,LangGraph知道某个特定步骤是否已经执行过。如果是,则LangGraph将简单地重新播放该特定步骤,而不会重新执行它(仅限于checkpoint_id之前的步骤)。所有checkpoint_id之后的步骤都将被执行(即一个新的分支),即使它们之前已经被执行过。参见这个关于时间旅行的指南以了解更多关于回放的内容

回放

更新状态

除了从特定的检查点重新播放图之外,我们还可以编辑图状态。我们使用graph.update_state()方法来实现这一点。此方法接受三个不同的参数:

config

配置应包含指定要更新哪个线程的thread_id。当仅传递thread_id时,我们更新(或分支)当前状态。可选地,如果包括checkpoint_id字段,则我们分支选定的检查点。

values

这是用于更新状态的值。请注意,此更新被视为任何节点更新处理的一样。这意味着这些值将传递给聚合器函数,如果它们定义了图状态中的某些通道。这意味着update_state并不会自动覆盖每个通道的值,而是仅对没有聚合器的通道进行更新。让我们通过一个例子来说明。

假设您定义了图的状态如下(参见上面的完整示例):

from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: int
    bar: Annotated[list[str], add]

现在假设图的当前状态为

{"foo": 1, "bar": ["a"]}

如果您更新状态如下:

graph.update_state(config, {"foo": 2, "bar": ["b"]})

那么图的新状态将为:

{"foo": 2, "bar": ["a", "b"]}

foo键(通道)完全更改(因为没有为该通道指定聚合器,所以update_state会覆盖它)。然而,为bar键指定了聚合器,因此它将"b"追加到bar的状态中。

as_node

最后,在调用update_state时您可以选择性地指定as_node。如果提供了它,更新将被视为来自节点as_node。如果没有提供as_node,则将其设置为最后一个更新状态的节点,如果不确定则不设置。这很重要是因为接下来要执行的步骤取决于最后一个给出更新的节点,因此可以用来控制哪个节点将执行下一步。参见这个关于时间旅行的指南以了解更多关于分支状态的内容

更新

内存存储

共享状态模型

一个状态模式指定了在执行图时填充的一组键。如上所述,状态可以在每个图步骤中由检查点器写入线程,从而实现状态持久化。

但是,如果我们希望在所有线程之间保留某些信息呢?考虑聊天机器人的场景,在这种情况下,我们希望跨所有与该用户相关的聊天会话(即线程)保留特定的用户信息!

仅使用检查点器无法在不同线程之间共享信息。这促使了对Store接口的需求。作为示例,我们可以定义一个InMemoryStore来存储用户跨线程的信息。我们只需像以前一样编译我们的图,并使用新的in_memory_store变量即可。

LangGraph API 自动处理存储

使用 LangGraph API 时,您不需要手动实现或配置存储。API 会在后台自动处理所有存储基础设施。

基本用法

首先,让我们在不使用 LangGraph 的情况下展示这一功能。

from langgraph.store.memory import InMemoryStore
in_memory_store = InMemoryStore()

记忆是通过一个元组命名空间化的,例如在本例中将是(<user_id>, "memories")。命名空间可以是任意长度,并表示任何东西,不一定与用户相关。

user_id = "1"
namespace_for_memory = (user_id, "memories")

我们使用store.put方法将记忆保存到存储中的命名空间。当我们这样做时,我们指定上述定义的命名空间以及记忆的键值对:键只是一个唯一标识记忆的标识符(memory_id),而值(字典)则是记忆本身。

memory_id = str(uuid.uuid4())
memory = {"food_preference" : "I like pizza"}
in_memory_store.put(namespace_for_memory, memory_id, memory)

我们可以使用store.search方法读取命名空间中的记忆。此方法将返回给定用户的全部记忆列表。最新的记忆是列表中的最后一个元素。

memories = in_memory_store.search(namespace_for_memory)
memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2bkj34-1631-47b7-b15f-65515d4c1843',
 'updated_at': '2bkj34-1631-47b7-b15f-65515d4c1843'}

每种记忆类型都是一个具有特定属性的 Python 类(Item)。我们可以通过转换为字典来访问它,如上所示。 它具有的属性包括:

  • value: 此记忆本身的值(也是一个字典)
  • key: 此命名空间中此记忆的唯一键
  • namespace: 此记忆类型的字符串列表
  • created_at: 此记忆创建的时间戳
  • updated_at: 此记忆更新的时间戳

语义搜索

除了简单的检索之外,存储还支持语义搜索,允许您根据意义而不是精确匹配来查找记忆。为了启用这一点,您可以使用嵌入模型配置存储:

API Reference: init_embeddings

from langchain.embeddings import init_embeddings

store = InMemoryStore(
    index={
        "embed": init_embeddings("openai:text-embedding-3-small"),  # 嵌入提供者
        "dims": 1536,                              # 嵌入维度
        "fields": ["food_preference", "$"]              # 要嵌入的字段
    }
)

现在在搜索时,您可以使用自然语言查询来找到相关记忆:

# 查找关于食物偏好的记忆
# (这可以在将记忆放入存储后完成)
memories = store.search(
    namespace_for_memory,
    query="What does the user like to eat?",
    limit=3  # 返回前三个匹配项
)

您可以控制哪些部分的记忆被嵌入,通过配置fields参数或在存储记忆时指定index参数:

# 存储具有特定字段以嵌入
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {
        "food_preference": "I love Italian cuisine",
        "context": "Discussing dinner plans"
    },
    index=["food_preference"]  # 只嵌入“food_preferences”字段
)

# 存储时不嵌入(仍可检索,但不可搜索)
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {"system_info": "Last updated: 2024-01-01"},
    index=False
)

在 LangGraph 中使用

有了这些设置,我们在 LangGraph 中使用in_memory_storein_memory_store与检查点器协同工作:检查点器将状态保存到线程中,如上所述,而in_memory_store允许我们在不同线程之间存储任意信息。我们如下编译图,同时使用检查点器和in_memory_store

API Reference: InMemorySaver

from langgraph.checkpoint.memory import InMemorySaver

# 我们需要这个是因为我们想要启用线程(对话)
checkpointer = InMemorySaver()

# ... 定义图 ...

# 编译图,同时使用检查点器和存储
graph = graph.compile(checkpointer=checkpointer, store=in_memory_store)

我们使用thread_id调用图,就像之前一样,同时使用user_id,我们将使用它来命名空间化此特定用户的记忆,如上所示。

# 调用图
user_id = "1"
config = {"configurable": {"thread_id": "1", "user_id": user_id}}

# 首先让我们向AI打招呼
for update in graph.stream(
    {"messages": [{"role": "user", "content": "hi"}]}, config, stream_mode="updates"
):
    print(update)

我们可以在任何节点中访问in_memory_storeuser_id,通过传递store: BaseStoreconfig: RunnableConfig作为节点参数。这是我们在节点中使用语义搜索来查找相关记忆的方法:

def update_memory(state: MessagesState, config: RunnableConfig, *, store: BaseStore):

    # 从配置中获取用户ID
    user_id = config["configurable"]["user_id"]

    # 命名空间化记忆
    namespace = (user_id, "memories")

    # ... 分析对话并创建新记忆

    # 创建一个新的记忆ID
    memory_id = str(uuid.uuid4())

    # 我们创建一个新的记忆
    store.put(namespace, memory_id, {"memory": memory})

如上所示,我们也可以在任何节点中访问存储,并使用store.search方法获取记忆。回忆一下,记忆是以对象的形式返回的,可以转换为字典。

memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2024-10-02T17:22:31.590602+00:00',
 'updated_at': '2024-10-02T17:22:31.590605+00:00'}

我们可以访问这些记忆并在模型调用中使用它们。

def call_model(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
    # 从配置中获取用户ID
    user_id = config["configurable"]["user_id"]

    # 命名空间化记忆
    namespace = (user_id, "memories")

    # 根据最近的消息进行搜索
    memories = store.search(
        namespace,
        query=state["messages"][-1].content,
        limit=3
    )
    info = "\n".join([d.value["memory"] for d in memories])

    # ... 在模型调用中使用记忆

如果我们创建了一个新的线程,只要user_id相同,我们仍然可以访问相同的记忆。

# 调用图
config = {"configurable": {"thread_id": "2", "user_id": "1"}}

# 让我们再次打招呼
for update in graph.stream(
    {"messages": [{"role": "user", "content": "hi, tell me about my memories"}]}, config, stream_mode="updates"
):
    print(update)

当我们使用 LangGraph 平台时,无论是本地(例如在 LangGraph Studio 中)还是使用 LangGraph Cloud,基础存储默认可用且无需在图编译期间指定。要启用语义搜索,您确实需要在您的langgraph.json文件中配置索引设置。例如:

{
    ...
    "store": {
        "index": {
            "embed": "openai:text-embeddings-3-small",
            "dims": 1536,
            "fields": ["$"]
        }
    }
}

有关更多详细信息和配置选项,请参阅部署指南

检查点库

在底层,检查点功能由符合基础检查点保存器接口的检查点对象提供支持。LangGraph 提供了几个检查点实现,所有这些实现都是通过独立安装的库来完成的:

  • langgraph-checkpoint:基础检查点保存器接口(BaseCheckpointSaver)和序列化/反序列化接口(SerializerProtocol)。包括内存中的检查点实现(InMemorySaver)用于实验。LangGraph 包含了 langgraph-checkpoint
  • langgraph-checkpoint-sqlite:使用 SQLite 数据库(SqliteSaver/AsyncSqliteSaver)的 LangGraph 检查点实现。适用于实验和本地工作流。需要单独安装。
  • langgraph-checkpoint-postgres:使用 Postgres 数据库(PostgresSaver/AsyncPostgresSaver)的高级检查点实现,用于 LangGraph 云。适用于生产环境。需要单独安装。

检查点接口

每个检查点都遵循基础检查点保存器接口并实现以下方法:

  • .put - 存储带有配置和元数据的检查点。
  • .put_writes - 存储与检查点关联的中间写入(即pending writes)。
  • .get_tuple - 使用给定配置(thread_idcheckpoint_id)获取检查点元组。这用于填充 graph.get_state() 中的 StateSnapshot
  • .list - 列出匹配给定配置和过滤条件的检查点。这用于填充 graph.get_state_history() 中的状态历史记录。

如果检查点器用于异步图执行(即通过 .ainvoke, .astream, .abatch 执行图),则会使用上述方法的异步版本(.aput, .aput_writes, .aget_tuple, .alist)。

注意

对于异步运行图,您可以使用 InMemorySaver 或 Sqlite/Postgres 检查点器的异步版本 -- AsyncSqliteSaver / AsyncPostgresSaver

序列化器

当检查点器保存图状态时,它们需要对状态中的通道值进行序列化。这是通过序列化器对象完成的。 langgraph-checkpoint 定义了实现序列化器的协议(protocol),并提供了默认实现(JsonPlusSerializer),该实现可以处理各种类型,包括 LangChain 和 LangGraph 基本类型、日期时间、枚举等。

功能

人机协作

首先,检查点器通过允许人类检查、中断和批准图步骤来促进人机协作工作流。这些工作流需要检查点器,因为人类必须能够在任何时候查看图的状态,并且图必须能够在人类对状态做出更新后继续执行。具体示例请参阅这些操作指南

记忆

其次,检查点器允许在交互之间实现"记忆"。在多次人类交互(如对话)的情况下,任何后续消息都可以发送到该线程,该线程将保留之前消息的记忆。有关如何使用检查点器添加和管理对话记忆的端到端示例,请参阅此操作指南

时间旅行

第三,检查点器允许实现"时间旅行",使用户能够重播先前的图执行以审查和/或调试特定图步骤。此外,检查点器使得可以在任意检查点处分叉图状态,以便探索替代轨迹。

故障容错

最后,检查点还提供了故障容错和错误恢复:如果一个或多个节点在一个给定的超级步中失败,您可以从最后一个成功的步骤重新启动图。另外,当图中的一个节点在一个给定的超级步中执行中途失败时,LangGraph会存储在同一超级步中成功完成的其他节点的待处理检查点写入,这样每当从该超级步重新开始图执行时,我们不会重新运行那些成功的节点。

待处理写入

此外,当图中的一个节点在一个给定的超级步中执行中途失败时,LangGraph会存储在同一超级步中成功完成的其他节点的待处理检查点写入,这样每当从该超级步重新开始图执行时,我们不会重新运行那些成功的节点。

Comments