功能性API¶
概述¶
**函数式API**允许您在对现有代码进行最小更改的情况下,向应用程序添加LangGraph的关键功能——持久性、内存、人机交互和流处理。
它被设计为可以集成到使用标准语言原语(如if
语句、for
循环和函数调用)进行分支和控制流的现有代码中。与许多要求将代码重构为显式管道或DAG的数据编排框架不同,函数式API允许您在不强制执行严格的执行模型的情况下,将这些功能融入其中。
函数式API使用两个关键构建模块:
@entrypoint
- 标记一个函数作为工作流程的起点,封装逻辑并管理执行流程,包括处理长时间运行的任务和中断。@task
- 表示一个独立的工作单元,如API调用或数据处理步骤,可以在入口点内异步执行。任务返回一个类似于未来的对象,该对象可以等待或同步解析。
这提供了一个最小的抽象层,用于构建具有状态管理和流处理的工作流程。
提示
对于偏好更声明式方法的用户,LangGraph的图API允许您使用图范式定义工作流程。这两个API共享相同的底层运行时,因此您可以将其结合在同一应用程序中使用。 请参阅函数式API与图API部分以比较这两种范式。
示例¶
以下我们演示了一个简单的应用程序,该程序撰写了一篇关于给定主题的文章,并中断以请求人类审核。
API Reference: MemorySaver | entrypoint | task | interrupt
from langgraph.checkpoint.memory import MemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import interrupt
@task
def write_essay(topic: str) -> str:
"""撰写一篇关于给定主题的文章。"""
time.sleep(1) # 一个长时间任务的占位符。
return f"一篇关于主题:{topic}的文章"
@entrypoint(checkpointer=MemorySaver())
def workflow(topic: str) -> dict:
"""一个简单的流程,撰写文章并请求审核。"""
essay = write_essay("猫").result()
is_approved = interrupt({
# 任何可以序列化的 JSON 负载都可以作为中断参数提供。
# 当从工作流中流式传输数据时,它将在客户端显示为中断。
"essay": essay, # 我们希望审核的文章。
# 我们可以添加任何额外的信息。
# 例如,引入一个名为 "action" 的键,并附带一些说明。
"action": "请批准/拒绝这篇文章",
})
return {
"essay": essay, # 生成的文章
"is_approved": is_approved, # 来自人工介入的响应
}
详细解释
此工作流将撰写一篇关于“猫”的主题文章,然后暂停以获取来自人类的审核。工作流可以在无限时间内被中断,直到提供审核为止。
当工作流恢复运行时,它会从开始执行,但由于 write_essay
任务的结果已经被保存,因此任务结果将从检查点加载而不是重新计算。
import time
import uuid
from langgraph.func import entrypoint, task
from langgraph.types import interrupt
from langgraph.checkpoint.memory import MemorySaver
@task
def write_essay(topic: str) -> str:
"""撰写一篇关于给定主题的文章。"""
time.sleep(1) # 这是长时间任务的一个占位符。
return f"一篇关于主题:{topic}的文章"
@entrypoint(checkpointer=MemorySaver())
def workflow(topic: str) -> dict:
"""一个简单的流程,撰写文章并请求审核。"""
essay = write_essay("猫").result()
is_approved = interrupt({
# 任何可以序列化的 JSON 负载都可以作为中断参数提供。
# 当从工作流中流式传输数据时,它将在客户端显示为中断。
"essay": essay, # 我们希望审核的文章。
# 我们可以添加任何额外的信息。
# 例如,引入一个名为 "action" 的键,并附带一些说明。
"action": "请批准/拒绝这篇文章",
})
return {
"essay": essay, # 生成的文章
"is_approved": is_approved, # 来自人工介入的响应
}
thread_id = str(uuid.uuid4())
config = {
"configurable": {
"thread_id": thread_id
}
}
for item in workflow.stream("猫", config):
print(item)
{'write_essay': '一篇关于主题:猫的文章'}
{'__interrupt__': (Interrupt(value={'essay': '一篇关于主题:猫的文章', 'action': '请批准/拒绝这篇文章'}, resumable=True, ns=['workflow:f7b8508b-21c0-8b4c-5958-4e8de74d2684'], when='during'),)}
文章已经撰写完成,现在准备审核。一旦提供了审核,我们可以继续执行工作流:
from langgraph.types import Command
# 从用户(例如通过UI)获取审核
# 在这种情况下,我们使用的是布尔值,但这里可以是任何JSON可序列化值。
human_review = True
for item in workflow.stream(Command(resume=human_review), config):
print(item)
工作流已完成,并且审核已添加到文章中。
入口点¶
@entrypoint
装饰器可用于从函数创建工作流。它封装了工作流逻辑,并管理执行流程,包括处理*长时间运行的任务*和中断。
定义¶
一个**入口点**是通过使用 @entrypoint
装饰器来装饰一个函数定义的。
该函数**必须接受一个位置参数**,作为工作流输入。如果需要传递多个数据项,请使用字典作为第一个参数的数据类型。
用 entrypoint
装饰一个函数会产生一个Pregel
实例,帮助管理工作流的执行(例如,处理流式传输、恢复和检查点)。
通常,您希望向 @entrypoint
装饰器传递一个**检查点器**以启用持久性并使用诸如**人机交互**等功能。
序列化
入口点的**输入**和**输出**必须是JSON可序列化的,以支持检查点。请参阅序列化部分以获取更多详细信息。
可注入参数¶
在声明一个 entrypoint
时,您可以请求访问在运行时自动注入的附加参数。这些参数包括:
参数 | 描述 |
---|---|
previous | 访问与给定线程相关的先前 checkpoint 的状态。详见状态管理。 |
store | BaseStore 的实例。对长期记忆有用。 |
writer | 用于流式传输自定义数据,将自定义数据写入 custom 流。对流式传输自定义数据有用。 |
config | 用于访问运行时配置。详见RunnableConfig 获取更多信息。 |
重要事项
使用适当的名称和类型注释声明参数。
请求可注入参数
from langchain_core.runnables import RunnableConfig
from langgraph.func import entrypoint
from langgraph.store.base import BaseStore
from langgraph.store.memory import InMemoryStore
in_memory_store = InMemoryStore(...) # InMemoryStore 的实例用于长期记忆
@entrypoint(
checkpointer=checkpointer, # 指定检查点器
store=in_memory_store # 指定存储
)
def my_workflow(
some_input: dict, # 输入(例如,通过 `invoke` 传递)
*,
previous: Any = None, # 短期记忆
store: BaseStore, # 长期记忆
writer: StreamWriter, # 流式传输自定义数据
config: RunnableConfig # 访问传递给入口点的配置
) -> ...:
执行¶
使用@entrypoint
会生成一个Pregel
对象,可以使用invoke
、ainvoke
、stream
和astream
方法执行。
恢复¶
在中断后恢复执行可以通过将一个**恢复**值传递给命令原语来完成。
错误后的恢复
要在一个错误后恢复执行,可以在具有相同**线程ID**(配置)的情况下,使用None
运行entrypoint
。
这假设底层**错误**已经解决,执行可以成功继续。
状态管理¶
当一个 entrypoint
定义了一个 checkpointer
时,它会在同一**线程ID**上的连续调用之间存储信息,在检查点中。
这允许使用 previous
参数从上一次调用中访问状态。
默认情况下,previous
参数是前一次调用的返回值。
@entrypoint(checkpointer=checkpointer)
def my_workflow(number: int, *, previous: Any = None) -> int:
previous = previous or 0
return number + previous
config = {
"configurable": {
"thread_id": "some_thread_id"
}
}
my_workflow.invoke(1, config) # 1 (previous was None)
my_workflow.invoke(2, config) # 3 (previous was 1 from the previous invocation)
entrypoint.final
¶
entrypoint.final 是一个特殊的原语,可以从入口点返回,并允许**解耦**保存在检查点中的值与**入口点的返回值**。
第一个值是入口点的返回值,第二个值是在检查点中保存的值。类型注释为 entrypoint.final[返回类型, 保存类型]
。
@entrypoint(checkpointer=checkpointer)
def my_workflow(number: int, *, previous: Any = None) -> entrypoint.final[int, int]:
previous = previous or 0
# 这将返回前一个值给调用者,将 2 * number 保存到检查点中,
# 在下一次调用中用于 `previous` 参数。
return entrypoint.final(value=previous, save=2 * number)
config = {
"configurable": {
"thread_id": "1"
}
}
my_workflow.invoke(3, config) # 0 (previous was None)
my_workflow.invoke(1, config) # 6 (previous was 3 * 2 from the previous invocation)
任务¶
一个**任务**代表了一个离散的工作单元,比如API调用或数据处理步骤。它具有两个关键特性:
- 异步执行:任务被设计为异步执行,允许多个操作并发运行而不阻塞。
- 检查点保存:任务结果会被保存到检查点中,从而可以从上次保存的状态继续工作流程。(有关更多细节,请参阅persistence)
定义¶
任务使用@task
装饰器定义,该装饰器包装了一个普通的Python函数。
API Reference: task
from langgraph.func import task
@task()
def slow_computation(input_value):
# 模拟长时间运行的操作
...
return result
序列化
为了支持检查点保存,任务的**输出**必须是JSON可序列化的。
执行¶
任务**只能从**入口点、另一个**任务**或一个状态图节点内部调用。
任务**不能**直接从主应用程序代码中调用。
当你调用一个**任务**时,它会立即返回一个未来对象。未来对象是一个结果的占位符,该结果将在稍后可用。
要获取一个**任务**的结果,你可以同步等待(使用result()
)或异步等待(使用await
)。
何时使用任务¶
**任务**在以下场景中很有用:
- 检查点保存:当你需要将长时间运行操作的结果保存到检查点时,以便在恢复工作流时不需要重新计算。
- 人机交互:如果你正在构建一个需要人工干预的工作流,你必须使用**任务**来封装任何随机性(例如API调用),以确保工作流可以正确恢复。有关更多详细信息,请参阅determinism部分。
- 并行执行:对于I/O密集型任务,**任务**允许并行执行,使多个操作可以同时运行而不被阻塞(例如调用多个API)。
- 可观测性:将操作封装在**任务**中提供了一种跟踪工作流进度并使用LangSmith监控单个操作执行的方法。
- 可重试工作:当需要重试工作以处理故障或不一致情况时,**任务**提供了一种封装和管理重试逻辑的方法。
序列化¶
在LangGraph中,序列化的两个关键方面如下:
@entrypoint
输入和输出必须是JSON可序列化的。@task
输出必须是JSON可序列化的。
这些要求是为了启用检查点(checkpoint)和工作流恢复。使用Python的基本类型,如字典、列表、字符串、数字和布尔值,以确保输入和输出是可序列化的。
序列化确保了工作流状态,如任务结果和中间值,可以可靠地保存和恢复。这对于启用人机交互、容错性和并行执行至关重要。
提供非序列化的输入或输出将会导致配置了检查点的工作流在运行时出错。
确定性¶
为了利用诸如**人机交互**等特性,任何随机性都应被封装在**任务**中。这保证了当执行暂停(例如,进行人机交互)后再恢复时,会遵循相同的*步骤序列*,即使**任务**结果是非确定性的。
LangGraph 通过在执行过程中持久化**任务**和子图的结果来实现这种行为。一个设计良好的工作流确保了恢复执行时会遵循相同的*步骤序列*,允许正确地检索之前计算的结果而无需重新执行它们。这对于长时间运行的**任务**或具有非确定性结果的**任务**特别有用,因为它避免了重复之前的工作,并允许从本质上相同的地方继续执行。
虽然不同运行的工作流可能会产生不同的结果,但恢复特定运行时应该始终遵循相同的记录步骤序列。这使得 LangGraph 能够高效地查找在中断之前已经执行过的**任务**和**子图**的结果,并避免重新计算它们。
自洽性¶
自洽性确保了运行同一操作多次会产生相同的结果。这有助于防止重复的API调用,并在由于失败而重新运行某个步骤时避免冗余处理。始终将API调用放在**任务**函数中以实现检查点功能,并设计这些任务在重新执行时能够保持自洽性。如果一个**任务**开始但未能成功完成,则可能会发生重新执行。然后,如果工作流被恢复,该**任务**将再次运行。使用自洽性键或验证现有结果以避免重复。
IMG_PLACEHOLDER_1
函数式API与图API¶
函数式API 和图API(状态图)提供了两种不同的范式来使用LangGraph创建应用程序。以下是两者的一些关键区别:
- 控制流:函数式API不需要考虑图结构。可以使用标准的Python构造来定义工作流程。这通常会减少你需要编写的代码量。
- 状态管理:图API 需要声明一个状态并且可能需要定义reducer来管理图状态的更新。
@entrypoint
和@tasks
不需要显式的状态管理,因为它们的状态范围限于函数内部,并不会跨函数共享。 - 检查点:两个API都会生成并使用检查点。在**图API**中,每个超级步骤之后都会生成一个新的检查点。而在**函数式API**中,当任务被执行时,其结果会被保存到与给定入口点相关的现有检查点中,而不是创建新的检查点。
- 可视化:图API使得容易地将工作流程以图的形式进行可视化,这对于调试、理解工作流程以及与他人分享都非常有用。而函数式API不支持可视化,因为图是在运行时动态生成的。
常见陷阱¶
处理副作用¶
将副作用(例如写入文件、发送电子邮件)封装在任务中,以确保在恢复工作流时不会执行多次。
在此示例中,一个副作用(写入文件)直接包含在工作流中,因此当恢复工作流时,它会被第二次执行。
在此示例中,副作用被封装在一个任务中,以确保在恢复时能够一致地执行。
非确定性控制流¶
可能会每次给出不同结果的操作(如获取当前时间或随机数)应该被封装在任务中,以确保在恢复时返回相同的结果。
- 在一个任务中:获取随机数(5)→ 中断 → 恢复 → (再次返回5)→ ...
- 不在一个任务中:获取随机数(5)→ 中断 → 恢复 → 获取新的随机数(7)→ ...
这在使用**人机交互**工作流并有多个中断调用的情况下尤为重要。LangGraph为每个任务/入口点维护一个恢复值列表。当遇到中断时,它会与相应的恢复值匹配。这种匹配是严格基于**索引**的,因此恢复值的顺序应与中断的顺序相匹配。
如果恢复时执行顺序无法保持一致,则一个interrupt
调用可能会与错误的resume
值匹配,导致结果不正确。
请参阅关于确定性的部分以获取更多细节。
在此示例中,工作流使用当前时间来决定执行哪个任务。这是非确定性的,因为工作流的结果取决于其执行的时间。
from langgraph.func import entrypoint
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
t0 = inputs["t0"]
# 高亮下一行
t1 = time.time()
delta_t = t1 - t0
if delta_t > 1:
result = slow_task(1).result()
value = interrupt("question")
else:
result = slow_task(2).result()
value = interrupt("question")
return {
"result": result,
"value": value
}
在此示例中,工作流使用输入t0
来决定执行哪个任务。这是确定性的,因为工作流的结果仅依赖于输入。
import time
from langgraph.func import task
# 高亮下一行
@task
# 高亮下一行
def get_time() -> float:
return time.time()
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
t0 = inputs["t0"]
# 高亮下一行
t1 = get_time().result()
delta_t = t1 - t0
if delta_t > 1:
result = slow_task(1).result()
value = interrupt("question")
else:
result = slow_task(2).result()
value = interrupt("question")
return {
"result": result,
"value": value
}
模式¶
以下是一些简单的模式示例,展示了如何使用**函数API**。
在定义一个entrypoint
时,输入被限制为函数的第一个参数。要传递多个输入,可以使用字典。
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
value = inputs["value"]
another_value = inputs["another_value"]
...
my_workflow.invoke({"value": 1, "another_value": 2})
并行执行¶
任务可以通过并发调用并等待结果来并行执行。这在I/O绑定任务(例如调用LLM的API)中很有用,可以提高性能。
@task
def add_one(number: int) -> int:
return number + 1
@entrypoint(checkpointer=checkpointer)
def graph(numbers: list[int]) -> list[str]:
futures = [add_one(i) for i in numbers]
return [f.result() for f in futures]
调用子图¶
函数API 和 图API 可以在同一应用程序中一起使用,因为它们共享相同的底层运行时环境。
API Reference: entrypoint | StateGraph
from langgraph.func import entrypoint
from langgraph.graph import StateGraph
builder = StateGraph()
...
some_graph = builder.compile()
@entrypoint()
def some_workflow(some_input: dict) -> int:
# 调用使用图API定义的图
result_1 = some_graph.invoke(...)
# 调用另一个使用图API定义的图
result_2 = another_graph.invoke(...)
return {
"result_1": result_1,
"result_2": result_2
}
调用其他入口点¶
您可以在一个**入口点**或**任务**内部调用其他**入口点**。
@entrypoint() # 将自动使用父入口点中的检查点器
def some_other_workflow(inputs: dict) -> int:
return inputs["value"]
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
value = some_other_workflow.invoke({"value": 1})
return value
流式传输自定义数据¶
您可以使用StreamWriter
类型从**入口点**流式传输自定义数据。这允许您将自定义数据写入custom
流。
API Reference: MemorySaver | entrypoint | task | StreamWriter
from langgraph.checkpoint.memory import MemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import StreamWriter
@task
def add_one(x):
return x + 1
@task
def add_two(x):
return x + 2
checkpointer = MemorySaver()
@entrypoint(checkpointer=checkpointer)
def main(inputs, writer: StreamWriter) -> int:
"""一个简单的流程,向数字添加一和二。"""
writer("hello") # 将一些数据写入`custom`流
add_one(inputs['number']).result() # 将数据写入`updates`流
writer("world") # 将更多数据写入`custom`流
add_two(inputs['number']).result() # 将数据写入`updates`流
return 5
config = {
"configurable": {
"thread_id": "1"
}
}
for chunk in main.stream({"number": 1}, stream_mode=["custom", "updates"], config=config):
print(chunk)
('updates', {'add_one': 2})
('updates', {'add_two': 3})
('custom', 'hello')
('custom', 'world')
('updates', {'main': 5})
Important
writer
参数将在运行时自动注入。只有当函数签名中出现与该名称完全一致的参数名时,才会注入。
重试策略¶
API Reference: MemorySaver | entrypoint | task | RetryPolicy
from langgraph.checkpoint.memory import MemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import RetryPolicy
attempts = 0
# 让我们配置重试策略以在ValueError上重试。
# 默认的重试策略是针对特定网络错误进行优化的。
retry_policy = RetryPolicy(retry_on=ValueError)
@task(retry=retry_policy)
def get_info():
global attempts
attempts += 1
if attempts < 2:
raise ValueError('Failure')
return "OK"
checkpointer = MemorySaver()
@entrypoint(checkpointer=checkpointer)
def main(inputs, writer):
return get_info().result()
config = {
"configurable": {
"thread_id": "1"
}
}
main.invoke({'any_input': 'foobar'}, config=config)
错误后的恢复¶
API Reference: MemorySaver | entrypoint | task | StreamWriter
import time
from langgraph.checkpoint.memory import MemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import StreamWriter
# 这个变量只是为了演示目的而模拟网络故障。
# 在实际代码中不会出现这种情况。
attempts = 0
@task()
def get_info():
"""
模拟一个在第一次尝试失败后成功执行的任务。
在第一次尝试时抛出异常,然后在后续尝试时返回"OK"。
"""
global attempts
attempts += 1
if attempts < 2:
raise ValueError("Failure") # 模拟第一次尝试时的失败
return "OK"
# 初始化一个内存检查点器用于持久化
checkpointer = MemorySaver()
@task
def slow_task():
"""
通过引入1秒延迟来模拟慢速运行的任务。
"""
time.sleep(1)
return "Ran slow task."
@entrypoint(checkpointer=checkpointer)
def main(inputs, writer: StreamWriter):
"""
主工作流函数,按顺序执行慢速任务和获取信息任务。
参数:
- inputs: 包含工作流输入值的字典。
- writer: 用于流式传输自定义数据的StreamWriter。
工作流首先执行`slow_task`,然后尝试执行`get_info`,
哪个将在第一次调用时失败。
"""
slow_task_result = slow_task().result() # 阻塞调用`slow_task`
get_info().result() # 异常将在第一次尝试时在这里抛出
return slow_task_result
# 工作流执行配置带有唯一的线程标识符
config = {
"configurable": {
"thread_id": "1" # 唯一标识符以跟踪工作流执行
}
}
# 此调用将花费约1秒,由于`slow_task`执行
try:
# 第一次调用将由于`get_info`任务失败而抛出异常
main.invoke({'any_input': 'foobar'}, config=config)
except ValueError:
pass # 优雅地处理失败
当我们恢复执行时,不需要重新运行slow_task
,因为其结果已经保存在检查点中。
人机交互¶
功能API支持使用interrupt
函数和Command
原语的人机交互工作流。
请参阅以下示例以了解更多信息:
- 如何等待用户输入(功能API):展示如何使用功能API实现简单的“人机交互”工作流。
- 如何审查工具调用(功能API):指南演示了如何使用LangGraph功能API在ReAct代理中实现“人机交互”工作流。
短期记忆¶
使用**前一个**参数,并可选地使用entrypoint.final
原语的状态管理可用于实现短期记忆。
请参阅以下操作指南以了解更多信息:
- 如何添加线程级持久性(功能API):展示如何将线程级持久性添加到功能API工作流中,并实现一个简单的聊天机器人。
长期记忆¶
长期记忆允许跨不同**线程ID**存储信息。这可能在给定用户的对话中学到信息并在另一对话中使用时非常有用。
请参阅以下操作指南以了解更多信息:
- 如何添加跨线程持久性(功能API):展示如何将跨线程持久性添加到功能API工作流中,并实现一个简单的聊天机器人。
工作流¶
- 工作流和代理指南提供了更多使用功能API构建工作流的例子。
代理¶
- 如何从头开始创建React代理(功能API):展示如何使用功能API从头开始创建一个简单的React代理。
- 如何构建多代理网络:展示如何使用功能API构建多代理网络。
- 如何在多代理应用中添加多轮对话(功能API):允许最终用户与一个或多个代理进行多轮对话。