如何为并行节点执行创建分支¶
并行执行节点对于加快整体图操作速度至关重要。LangGraph 提供了对节点并行执行的本地支持,这可以显著提高基于图的工作流的性能。这种并行化是通过扇出和扇入机制实现的,利用了标准边缘和条件边。以下是几个示例,展示了如何添加适合您的分支数据流。
设置环境¶
首先,让我们安装所需的包
为LangGraph开发设置LangSmith
注册LangSmith可以快速发现并解决您的LangGraph项目中的问题,并提高其性能。通过使用跟踪数据,您可以调试、测试和监控使用LangGraph构建的LLM应用程序——更多关于如何开始的信息,请参阅这里。
如何并行运行图节点¶
在这个示例中,我们从Node A
分发到B和C
,然后汇聚到D
。通过我们的状态,我们指定了reducer的加法操作。这将合并或累积特定键在状态中的值,而不是简单地覆盖现有值。对于列表来说,这意味着将新列表与现有列表连接起来。有关使用reducer更新状态的更多细节,请参阅此指南。
API Reference: StateGraph | START | END
import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
# The operator.add reducer fn makes this append-only
aggregate: Annotated[list, operator.add]
def a(state: State):
print(f'Adding "A" to {state["aggregate"]}')
return {"aggregate": ["A"]}
def b(state: State):
print(f'Adding "B" to {state["aggregate"]}')
return {"aggregate": ["B"]}
def c(state: State):
print(f'Adding "C" to {state["aggregate"]}')
return {"aggregate": ["C"]}
def d(state: State):
print(f'Adding "D" to {state["aggregate"]}')
return {"aggregate": ["D"]}
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
通过reducer,你可以看到每个节点中添加的值是被累积起来的。
Note
在上述示例中,节点 "b"
和 "c"
在同一个超级步骤中并发执行。由于它们处于同一步骤,因此节点 "d"
在节点 "b"
和 "c"
完成后执行。
重要的是,并行超级步骤中的更新可能不会按一致的顺序进行。如果您需要从并行超级步骤中以一致且预定的顺序获取更新,则应将输出写入状态中的单独字段,并与用于排序的值一起保存。
异常处理?
LangGraph 在“超级步”中执行节点,这意味着虽然并行分支可以并行执行,但整个超级步是事务性的。如果这些分支中的任何一个抛出异常,则没有更新会被应用到状态(整个超级步失败)。
重要的是,在使用检查点器时,成功节点在超级步中的结果会被保存,并且在恢复时不会重复执行。
如果您有容易出错的情况(或许想要处理不可靠的API调用),LangGraph 提供了两种方法来解决这个问题:- 您可以在节点中编写常规的Python代码以捕获和处理异常。
- 您可以设置一个重试策略,以便指示图重试抛出特定类型异常的节点。只有失败的分支会被重试,因此您无需担心执行冗余工作。
并行节点扇出和扇入带额外步骤¶
上面的例子展示了当每条路径只有一个步骤时如何进行扇出和扇入。但如果一条路径包含多个步骤会怎样呢?让我们在“b”分支中添加一个节点 b_2
:
def b_2(state: State):
print(f'Adding "B_2" to {state["aggregate"]}')
return {"aggregate": ["B_2"]}
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(b_2)
builder.add_node(c)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b_2")
builder.add_edge(["b_2", "c"], "d")
builder.add_edge("d", END)
graph = builder.compile()
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "B_2" to ['A', 'B', 'C']
Adding "D" to ['A', 'B', 'C', 'B_2']
Note
在上面的例子中,节点 "b"
和 "c"
在同一个超级步骤中并发执行。那么下一步会发生什么?
我们使用 add_edge(["b_2", "c"], "d")
这里强制节点 "d"
只在节点 "b_2"
和 "c"
都完成执行后才运行。如果我们添加两条独立的边,
节点 "d"
将会运行两次:一次是在节点 b2
完成后,另一次是在节点 c
完成后(无论这两个节点以何种顺序完成)。
条件分支¶
如果你的分发不是确定性的,你可以直接使用add_conditional_edges。
API Reference: StateGraph | START | END
import operator
from typing import Annotated, Sequence
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
aggregate: Annotated[list, operator.add]
# Add a key to the state. We will set this key to determine
# how we branch.
which: str
def a(state: State):
print(f'Adding "A" to {state["aggregate"]}')
return {"aggregate": ["A"]}
def b(state: State):
print(f'Adding "B" to {state["aggregate"]}')
return {"aggregate": ["B"]}
def c(state: State):
print(f'Adding "C" to {state["aggregate"]}')
return {"aggregate": ["C"]}
def d(state: State):
print(f'Adding "D" to {state["aggregate"]}')
return {"aggregate": ["D"]}
def e(state: State):
print(f'Adding "E" to {state["aggregate"]}')
return {"aggregate": ["E"]}
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
builder.add_node(e)
builder.add_edge(START, "a")
def route_bc_or_cd(state: State) -> Sequence[str]:
if state["which"] == "cd":
return ["c", "d"]
return ["b", "c"]
intermediates = ["b", "c", "d"]
builder.add_conditional_edges(
"a",
route_bc_or_cd,
intermediates,
)
for node in intermediates:
builder.add_edge(node, "e")
builder.add_edge("e", END)
graph = builder.compile()
下一步¶
- 继续学习图API基础指南。
- 学习如何创建map-reduce分支,在这些分支中,不同的状态可以分发到节点的多个实例中。