Skip to content

持久执行

持久执行 是一种技术,其中进程或工作流在关键点保存其进度,允许它暂停并在以后从上次停止的地方继续执行。这在需要人机交互的场景中特别有用,在这些场景中用户可以检查、验证或修改流程,然后再继续。此外,持久执行还适用于可能遇到中断或错误(例如调用LLM时超时)的长时间运行任务。通过保存已完成的工作,持久执行使进程能够在不重新处理先前步骤的情况下恢复执行——即使经过了较长时间的延迟(例如一周后)。

LangGraph内置的persistence层为工作流提供了持久执行功能,确保每个执行步骤的状态都被保存到持久存储中。这一能力保证了如果工作流因系统故障或其他原因(包括人机交互互动)而被中断,它可以从中断前的状态恢复执行。

Tip

如果您使用的是带有检查点器的LangGraph,则持久执行已经启用。您可以随时暂停和恢复工作流,即使在中断或失败之后。 要充分利用持久执行,确保您的工作流设计为确定性idempotent,并将任何副作用或非确定性操作封装在tasks中。您可以从StateGraph (Graph API)Functional API中使用tasks

系统要求

要在LangGraph中利用持久执行功能,你需要:

  1. 在工作流中启用持久化,通过指定一个检查点库来保存工作流进度。
  2. 在执行工作流时指定一个线程标识符。这将跟踪特定实例的工作流执行历史。
  3. 将任何非确定性操作(如随机数生成)或具有副作用的操作(如文件写入、API调用)包裹在tasks中,以确保当工作流被恢复时,这些操作不会重复执行,而是从持久层中检索其结果。有关更多信息,请参见确定性和一致重放

确定性和一致重播

当你恢复一个工作流运行时,代码并不会从执行停止的**同一行代码**继续;相反,它会确定一个合适的开始点来继续执行。这意味着工作流将从开始点重新播放所有步骤,直到达到停止的位置。

因此,在编写持久执行的工作流时,必须将任何非确定性操作(例如随机数生成)以及具有副作用的操作(例如文件写入、API 调用)封装在tasksnodes中。

为了确保你的工作流是确定性的,并且可以被一致地重播,请遵循以下指南:

  • 避免重复工作:如果一个nodes包含多个具有副作用的操作(例如日志记录、文件写入或网络调用),则将每个操作封装在一个单独的**任务**中。这确保了当工作流被恢复时,这些操作不会被重复执行,并且它们的结果可以从持久层中检索。
  • 封装非确定性操作:将可能产生非确定性结果的任何代码(例如随机数生成)封装在**任务**或**节点**中。这确保了在恢复时,工作流将按照记录的确切步骤顺序执行,并且具有相同的结果。
  • 使用幂等操作:尽可能确保具有副作用的操作(例如API调用、文件写入)是幂等的。这意味着如果操作在工作流失败后被重试,则其效果与第一次执行时相同。这对于导致数据写入的操作尤为重要。如果一个**任务**开始但未能成功完成,工作流的恢复将重新运行该**任务**,依赖于记录的结果以保持一致性。使用幂等键或验证现有结果以避免意外重复,确保工作流执行顺畅且可预测。

有关要避免的一些陷阱示例,请参阅功能API中的常见陷阱部分,其中展示了如何通过使用**任务**来结构化代码以避免这些问题。同样的原则适用于状态图(图形API)

在节点中使用任务

如果一个节点包含多个操作,您可能会发现将其每个操作转换为一个**任务**比将其重构为单独的节点更简单。

from typing import NotRequired
from typing_extensions import TypedDict
import uuid

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
import requests

# 定义一个TypedDict来表示状态
class State(TypedDict):
    url: str
    result: NotRequired[str]

def call_api(state: State):
    """示例节点,用于发出API请求。"""
    result = requests.get(state['url']).text[:100]  # 副作用
    return {
        "result": result
    }

# 创建一个StateGraph构建器,并添加一个调用call_api函数的节点
builder = StateGraph(State)
builder.add_node("call_api", call_api)

# 将开始和结束节点连接到call_api节点
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)

# 指定一个检查点器
checkpointer = MemorySaver()

# 使用检查点器编译图
graph = builder.compile(checkpointer=checkpointer)

# 定义一个带有线程ID的配置。
thread_id = uuid.uuid4()
config = {"configurable": {"thread_id": thread_id}}

# 调用图
graph.invoke({"url": "https://www.example.com"}, config)
from typing import NotRequired
from typing_extensions import TypedDict
import uuid

from langgraph.checkpoint.memory import MemorySaver
from langgraph.func import task
from langgraph.graph import StateGraph, START, END
import requests

# 定义一个TypedDict来表示状态
class State(TypedDict):
    urls: list[str]
    result: NotRequired[list[str]]

@task
def _make_request(url: str):
    """发出请求。"""
    return requests.get(url).text[:100]

def call_api(state: State):
    """示例节点,用于发出API请求。"""
    requests = [_make_request(url) for url in state['urls']]
    results = [request.result() for request in requests]
    return {
        "results": results
    }

# 创建一个StateGraph构建器,并添加一个调用call_api函数的节点
builder = StateGraph(State)
builder.add_node("call_api", call_api)

# 将开始和结束节点连接到call_api节点
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)

# 指定一个检查点器
checkpointer = MemorySaver()

# 使用检查点器编译图
graph = builder.compile(checkpointer=checkpointer)

# 定义一个带有线程ID的配置。
thread_id = uuid.uuid4()
config = {"configurable": {"thread_id": thread_id}}

# 调用图
graph.invoke({"urls": ["https://www.example.com"]}, config)

恢复工作流

一旦你在工作流中启用了持久执行,你就可以在以下场景中恢复执行:

  • 暂停和恢复工作流: 使用interrupt函数在特定点暂停工作流,并使用Command原语以更新的状态恢复它。更多详情请参阅人机交互
  • 从故障中恢复: 在异常发生后(例如LLM提供商中断),自动从最后一个成功的检查点恢复工作流。这涉及到通过提供一个None作为输入值来用相同的线程标识符执行工作流(参见此示例中的函数API)。

重新启动工作流的起点

  • 如果您使用的是状态图(图形API),则起点是执行停止的节点的开始位置。
  • 如果您在一个节点内部调用子图,则起点将是调用了被中断的子图的**父**节点。 在子图内部,起点将是执行停止的具体节点
  • 如果您使用的是函数式API,则起点是执行停止的入口点的开始位置。

Comments