Skip to content

如何使用 NodeInterrupt 动态添加断点

Note

对于**人机交互**工作流,请使用新的interrupt()函数来实现**人机交互**工作流。请参阅人机交互概念指南以获取有关使用interrupt的设计模式的更多信息。

先决条件

本指南假设您熟悉以下概念:

人机交互(HIL)互动对于代理系统至关重要。断点是一种常见的HIL交互模式,允许图在特定步骤处停止并寻求人类批准后再继续执行(例如,对于敏感操作)。

在LangGraph中,您可以在节点执行之前或之后添加断点。但是,通常情况下,根据某些条件从给定节点内部动态中断图可能是有用的。在这种情况下,包含有关为什么触发该中断的信息也可能很有帮助。

本指南展示了如何使用NodeInterrupt动态中断图——这是一个可以从节点内部抛出的特殊异常。让我们看看它的实际应用!

环境搭建

首先,让我们安装所需的包

pip install -U langgraph

为LangGraph开发设置LangSmith

注册LangSmith可以快速发现并解决您的LangGraph项目中的问题,并提高其性能。LangSmith允许您使用跟踪数据来调试、测试和监控使用LangGraph构建的LLM应用程序——更多关于如何开始的信息,请参阅这里

定义图

API Reference: StateGraph | START | END | MemorySaver

from typing_extensions import TypedDict
from IPython.display import Image, display

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt


class State(TypedDict):
    input: str


def step_1(state: State) -> State:
    print("---Step 1---")
    return state


def step_2(state: State) -> State:
    # Let's optionally raise a NodeInterrupt
    # if the length of the input is longer than 5 characters
    if len(state["input"]) > 5:
        raise NodeInterrupt(
            f"Received input that is longer than 5 characters: {state['input']}"
        )

    print("---Step 2---")
    return state


def step_3(state: State) -> State:
    print("---Step 3---")
    return state


builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)

# Set up memory
memory = MemorySaver()

# Compile the graph with memory
graph = builder.compile(checkpointer=memory)

# View
display(Image(graph.get_graph().draw_mermaid_png()))

使用动态中断运行图

首先,让我们用一个长度小于等于5个字符的输入来运行图。这应该会安全地忽略我们定义的中断条件,并在图执行结束时返回原始输入。

initial_input = {"input": "hello"}
thread_config = {"configurable": {"thread_id": "1"}}

for event in graph.stream(initial_input, thread_config, stream_mode="values"):
    print(event)
{'input': 'hello'}
---Step 1---
{'input': 'hello'}
---Step 2---
{'input': 'hello'}
---Step 3---
{'input': 'hello'}
如果我们此时检查图形,可以看到没有更多的任务等待运行,并且该图确实完成了执行。

state = graph.get_state(thread_config)
print(state.next)
print(state.tasks)
()
()
现在,让我们使用一个长度超过5个字符的输入来运行该图。这应该会触发我们通过在step_2节点内部抛出NodeInterrupt错误而定义的动态中断。

initial_input = {"input": "hello world"}
thread_config = {"configurable": {"thread_id": "2"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread_config, stream_mode="values"):
    print(event)
{'input': 'hello world'}
---Step 1---
{'input': 'hello world'}
我们可以看到,图形在执行step_2时停止了。如果我们检查此时的图状态,可以看到关于下一个要执行的节点(step_2)的信息,以及哪个节点引发了中断(同样是step_2),还有关于中断的其他相关信息。

state = graph.get_state(thread_config)
print(state.next)
print(state.tasks)
('step_2',)
(PregelTask(id='365d4518-bcff-5abd-8ef5-8a0de7f510b0', name='step_2', error=None, interrupts=(Interrupt(value='Received input that is longer than 5 characters: hello world', when='during'),)),)
如果我们尝试从断点处恢复图形,由于我们的输入和图形状态没有发生变化,我们将再次中断。

# NOTE: to resume the graph from a dynamic interrupt we use the same syntax as with regular interrupts -- we pass None as the input
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)

state = graph.get_state(thread_config)
print(state.next)
print(state.tasks)
('step_2',)
(PregelTask(id='365d4518-bcff-5abd-8ef5-8a0de7f510b0', name='step_2', error=None, interrupts=(Interrupt(value='Received input that is longer than 5 characters: hello world', when='during'),)),)

更新图形状态

为了应对这个问题,我们可以采取几种措施。

首先,我们可以在一个不同的线程上运行图,并使用较短的输入,就像我们在开始时所做的那样。或者,如果我们希望从断点处恢复图的执行,可以更新状态以使其输入长度小于5个字符(这是我们的中断条件)。

# NOTE: this update will be applied as of the last successful node before the interrupt, i.e. `step_1`, right before the node with an interrupt
graph.update_state(config=thread_config, values={"input": "foo"})
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)

state = graph.get_state(thread_config)
print(state.next)
print(state.values)
---Step 2---
{'input': 'foo'}
---Step 3---
{'input': 'foo'}
()
{'input': 'foo'}
您也可以更新状态 作为节点 step_2(中断节点),这样会完全跳过该节点。

initial_input = {"input": "hello world"}
thread_config = {"configurable": {"thread_id": "3"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread_config, stream_mode="values"):
    print(event)
{'input': 'hello world'}
---Step 1---
{'input': 'hello world'}

# NOTE: this update will skip the node `step_2` altogether
graph.update_state(config=thread_config, values=None, as_node="step_2")
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)

state = graph.get_state(thread_config)
print(state.next)
print(state.values)
---Step 3---
{'input': 'hello world'}
()
{'input': 'hello world'}

Comments