Skip to content

人机交互

interrupt 函数是新引入的。

自 LangGraph 0.2.57 版本以来,推荐使用新的 [interrupt 函数][langgraph.types.interrupt] 来设置断点,因为它简化了 人机交互 模式。

如果您正在寻找之前的版本(该版本依赖于静态断点和 NodeInterrupt 异常),可以在这里查看:这里

人机交互(或“人在回路中”)的工作流将人类输入集成到自动化过程中,在关键阶段允许决策、验证或纠正。这在基于 大语言模型的应用 中特别有用,因为底层模型可能会偶尔生成不准确的内容。在低容错场景中,如合规性、决策制定或内容生成,人类参与确保了可靠性,通过审查、纠正或覆盖模型输出来实现。

[langgraph.types.interrupt]:

使用场景

基于LLM的应用程序中**人机协作**工作流的关键使用场景包括:

  1. 🛠️ 审查工具调用: 人类可以在LLM请求工具执行之前审查、编辑或批准工具调用。
  2. ✅ 验证LLM输出: 人类可以审查、编辑或批准由LLM生成的内容。
  3. 💡 提供上下文: 允许LLM明确请求人类输入以澄清或提供额外细节,或支持多轮对话。

中断

LangGraph中的中断函数通过在特定节点暂停图,向人类展示信息,并根据他们的输入恢复图来支持人机协作的工作流程。此功能适用于诸如批准、编辑或收集额外输入等任务。中断函数Command对象结合使用,以便用人类提供的值恢复图。

API Reference: interrupt

from langgraph.types import interrupt

def human_node(state: State):
    value = interrupt(
        # 任何可序列化的JSON值以供人类查看。
        # 例如,一个问题、一段文本或状态中的一组键
       {
          "text_to_revise": state["some_text"]
       }
    )
    # 使用人类的输入更新状态或将图路由到基于输入的位置。
    return {
        "some_text": value
    }

graph = graph_builder.compile(
    checkpointer=checkpointer # 必须用于`中断`才能工作
)

# 运行图直到中断
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(some_input, config=thread_config)

# 使用人类的输入恢复图
graph.invoke(Command(resume=value_from_human), config=thread_config)
{'some_text': '编辑后的文本'}

Warning

中断既强大又符合人体工程学。然而,尽管它们可能在开发人员体验方面类似于Python的input()函数,但重要的是要注意,它们不会自动从中断点继续执行。相反,它们会重新运行中断所在的整个节点。 因此,中断通常最好放置在一个节点的开始处或专门为此目的创建的一个节点中。请参阅从中断恢复如何工作部分以获取更多详细信息。

完整代码

如果您想查看如何在图中使用中断的完整示例,请参考以下代码。

from typing import TypedDict
import uuid

from langgraph.checkpoint.memory import MemorySaver
from langgraph.constants import START
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command

class State(TypedDict):
   """图的状态。"""
   some_text: str

def human_node(state: State):
   value = interrupt(
      # 任何可序列化的JSON值以供人类查看。
      # 例如,一个问题、一段文本或状态中的一组键
      {
         "text_to_revise": state["some_text"]
      }
   )
   return {
      # 使用人类的输入更新状态
      "some_text": value
   }


# 构建图
graph_builder = StateGraph(State)
# 将human_node添加到图中
graph_builder.add_node("human_node", human_node)
graph_builder.add_edge(START, "human_node")

# 中断功能需要一个检查点器才能工作。
checkpointer = MemorySaver()
graph = graph_builder.compile(
   checkpointer=checkpointer
)

# 将线程ID传递给图以运行它。
thread_config = {"configurable": {"thread_id": uuid.uuid4()}}

# 使用stream()直接显示`__interrupt__`信息。
for chunk in graph.stream({"some_text": "原始文本"}, config=thread_config):
   print(chunk)

# 使用Command恢复
for chunk in graph.stream(Command(resume="编辑后的文本"), config=thread_config):
   print(chunk)
{'__interrupt__': (
      Interrupt(
         value={'question': '请修改文本', 'some_text': '原始文本'}, 
         resumable=True, 
         ns=['human_node:1a7b5e4f-3688-c8c6-0d0a-ec61a43fecd6'], 
         when='期间'
      ),
   )
}
{'human_node': {'some_text': '编辑后的文本'}}

使用要求

要在图中使用中断功能,你需要:

  1. 指定一个检查点器,以便在每一步后保存图的状态。
  2. 调用interrupt(),在适当的位置调用它。参见设计模式部分获取示例。
  3. 使用一个线程ID运行图,直到遇到中断
  4. 使用invoke/ainvoke/stream/astream恢复执行(参见命令原语)。

设计模式

在人类参与的工作流中,通常有三种不同的**操作**:

  1. 批准或拒绝:在关键步骤(如API调用)之前暂停工作流以审核并批准操作。如果操作被拒绝,则可以阻止工作流执行该步骤,并可能采取替代行动。这种模式通常涉及根据人的输入来**路由**工作流。
  2. 编辑图状态:暂停工作流以审核和编辑图的状态。这有助于纠正错误或使用额外信息更新状态。这种模式通常涉及使用人的输入来**更新**状态。
  3. 获取输入:在图中的特定步骤显式请求人类输入。这有助于收集额外信息或上下文,以支持代理的决策过程或支持**多轮对话**。

以下展示了可以使用这些**操作**实现的不同设计模式。

批准或拒绝

image

根据人类的批准或拒绝,工作流可以继续执行操作或采取替代路径。

在关键步骤(如API调用)之前暂停工作流以审核并批准操作。如果操作被拒绝,则可以阻止工作流执行该步骤,并可能采取替代行动。

API Reference: interrupt | Command

from typing import Literal
from langgraph.types import interrupt, Command

def human_approval(state: State) -> Command[Literal["some_node", "another_node"]]:
    is_approved = interrupt(
        {
            "question": "这是正确的吗?",
            # 显示应由人类审核和批准的输出
            "llm_output": state["llm_output"]
        }
    )

    if is_approved:
        return Command(goto="some_node")
    else:
        return Command(goto="another_node")

# 将节点添加到图中的适当位置
# 并将其连接到相关节点。
graph_builder.add_node("human_approval", human_approval)
graph = graph_builder.compile(checkpointer=checkpointer)

# 运行图并遇到中断后,图将暂停。
# 使用批准或拒绝恢复它。
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(Command(resume=True), config=thread_config)

有关更详细示例,请参阅如何审核工具调用

审核及编辑状态

image

人类可以审核和编辑图的状态。这有助于纠正错误或使用额外信息更新状态。

API Reference: interrupt

from langgraph.types import interrupt

def human_editing(state: State):
    ...
    result = interrupt(
        # 提供给客户端的中断信息。
        # 可以是任何可序列化的JSON值。
        {
            "task": "审核LLM生成的摘要并进行必要的编辑。",
            "llm_generated_summary": state["llm_generated_summary"]
        }
    )

    # 使用编辑后的文本更新状态
    return {
        "llm_generated_summary": result["edited_text"] 
    }

# 将节点添加到图中的适当位置
# 并将其连接到相关节点。
graph_builder.add_node("human_editing", human_editing)
graph = graph_builder.compile(checkpointer=checkpointer)

...

# 运行图并遇到中断后,图将暂停。
# 使用编辑后的文本恢复它。
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(
    Command(resume={"edited_text": "编辑后的文本"}), 
    config=thread_config
)

有关更详细示例,请参阅如何等待用户输入

审核工具调用

image

人类可以在继续之前审核和编辑LLM的输出。这在应用程序中尤为重要,其中LLM请求的工具调用可能是敏感的或需要人工监督的。
def human_review_node(state) -> Command[Literal["call_llm", "run_tool"]]:
    # 这是我们将通过Command(resume=<human_review>)提供的值
    human_review = interrupt(
        {
            "question": "这是正确的吗?",
            # 显示工具调用以供审核
            "tool_call": tool_call
        }
    )

    review_action, review_data = human_review

    # 批准工具调用并继续
    if review_action == "continue":
        return Command(goto="run_tool")

    # 手动修改工具调用然后继续
    elif review_action == "update":
        ...
        updated_msg = get_updated_msg(review_data)
        # 记住要修改现有消息,您需要传递具有匹配ID的消息。
        return Command(goto="run_tool", update={"messages": [updated_message]})

    # 给出自然语言反馈,然后将其返回给代理
    elif review_action == "feedback":
        ...
        feedback_msg = get_feedback_msg(review_data)
        return Command(goto="call_llm", update={"messages": [feedback_msg]})

有关更详细示例,请参阅如何审核工具调用

多轮对话

image

一个多轮对话架构,其中代理人类节点来回循环,直到代理决定将对话移交给另一个代理或系统的另一部分。

**多轮对话**涉及代理与人类之间的多次往返交互,允许代理以对话方式从人类那里收集更多信息。

此设计模式适用于包含多个代理的LLM应用。一个或多个代理可能需要与人类进行多轮对话,在对话的不同阶段提供输入或反馈。为了简化,下面的代理实现被描述为单个节点,但实际上它可能是由多个节点组成的更大图的一部分,并包括条件边。

在此模式中,每个代理都有自己的人类节点用于收集用户输入。可以通过将人类节点命名为唯一名称(例如,“代理1的人类”,“代理2的人类”)或使用子图来实现,其中子图包含一个人类节点和一个代理节点。

from langgraph.types import interrupt

def human_input(state: State):
    human_message = interrupt("human_input")
    return {
        "messages": [
            {
                "role": "human",
                "content": human_message
            }
        ]
    }

def agent(state: State):
    # 代理逻辑
    ...

graph_builder.add_node("human_input", human_input)
graph_builder.add_edge("human_input", "agent")
graph = graph_builder.compile(checkpointer=checkpointer)

# 运行图并遇到中断后,图将暂停。
# 使用人类的输入恢复它。
graph.invoke(
    Command(resume="你好!"),
    config=thread_config
)

在此模式中,单个人类节点用于为多个代理收集用户输入。活动代理从状态中确定,因此在收集输入后,图可以路由到正确的代理。

from langgraph.types import interrupt

def human_node(state: MessagesState) -> Command[Literal["agent_1", "agent_2", ...]]:
    """用于收集用户输入的节点。"""
    user_input = interrupt(value="准备接收用户输入。")

    # 从状态中确定**活动代理**,以便
    # 我们可以在收集输入后路由到正确的代理。
    # 例如,向状态添加字段或使用最后一个活动代理。
    # 或填充代理生成的AI消息的`name`属性。
    active_agent = ... 

    return Command(
        update={
            "messages": [{
                "role": "human",
                "content": user_input,
            }]
        },
        goto=active_agent,
    )

有关更详细示例,请参阅如何实现多轮对话

验证人类输入

如果您需要在图本身内部(而不是在客户端上)验证人类提供的输入,可以通过在单个节点内使用多个中断调用来实现。

API Reference: interrupt

from langgraph.types import interrupt

def human_node(state: State):
    """带验证的人类节点。"""
    question = "你的年龄是多少?"

    while True:
        answer = interrupt(question)

        # 验证答案,如果答案无效则再次请求输入。
        if not isinstance(answer, int) or answer < 0:
            question = f"'{answer}'不是一个有效的年龄。你的年龄是多少?"
            answer = None
            continue
        else:
            # 如果答案有效,我们可以继续。
            break

    print(f"人类参与的年龄是{answer}岁。")
    return {
        "age": answer
    }

Command 原语

使用 interrupt 函数时,图形将在中断处暂停并等待用户输入。

可以通过 Command 原语来恢复图形执行,该原语可以通过 invokeainvokestreamastream 方法传递。

Command 原语提供了几种选项来控制和修改恢复过程中图形的状态:

  1. interrupt 传递值:通过 Command(resume=value) 将数据(如用户的响应)提供给图形。执行从使用 interrupt 的节点开始处恢复,但这次 interrupt(...) 调用将返回在 Command(resume=value) 中传递的值,而不是暂停图形。

    # 使用用户的输入恢复图形执行。
    graph.invoke(Command(resume={"age": "25"}), thread_config)
    
  2. 更新图形状态:使用 Command(update=update) 修改图形状态。请注意,恢复是从使用 interrupt 的节点开始处进行的。执行从使用 interrupt 的节点开始处恢复,但使用更新后的状态。

    # 更新图形状态并恢复。
    # 如果使用了 `interrupt`,则必须提供一个 `resume` 值。
    graph.invoke(Command(update={"foo": "bar"}, resume="Let's go!!!"), thread_config)
    

通过利用 Command,您可以恢复图形执行、处理用户输入,并动态调整图形的状态。

中断恢复是如何工作的?

Warning

从一个 中断 恢复执行与 Python 的 input() 函数不同,在 input() 函数被调用的地方继续执行。

使用 中断 的关键在于理解恢复机制。当你在触发最后一个 中断 后恢复执行时,图执行将从该 图节点开始 处重新启动。

所有 从节点开始到 中断 的代码都将被重新执行。

counter = 0
def node(state: State):
    # 当图恢复时,从节点开始到中断的所有代码都将被重新执行。
    global counter
    counter += 1
    print(f"> 进入节点:{counter} 次")
    # 暂停图并等待用户输入。
    answer = interrupt()
    print("计数器的值是:", counter)
    ...

恢复 图之后,计数器将第二次递增,导致以下输出:

> 进入节点:2 次
计数器的值是:2

使用一次调用来恢复多个中断

如果你的任务队列中有多个中断,你可以使用 Command.resume 和一个映射中断 ID 到恢复值的字典来在一个 invoke / stream 调用中恢复多个中断。

例如,一旦你的图被中断(理论上可以多次)并且停滞不前:

resume_map = {
    i.interrupt_id: f"人类输入的提示{i.value}"
    for i in parent.get_state(thread_config).interrupts
}

parent_graph.invoke(Command(resume=resume_map), config=thread_config)

常见陷阱

旁效作用

将具有旁效作用的代码(如API调用)放置在interrupt之后,以避免重复执行,因为每次节点恢复时这些操作都会重新触发。

当节点从interrupt恢复时,这段代码会再次执行API调用。 如果API调用不是幂等的或只是昂贵的操作,这可能会导致问题。

from langgraph.types import interrupt

def human_node(state: State):
    """带验证的人类节点。"""
    api_call(...) # 这段代码会在节点恢复时再次执行。
    answer = interrupt(question)
from langgraph.types import interrupt

def human_node(state: State):
    """带验证的人类节点。"""

    answer = interrupt(question)

    api_call(answer) # 正确,因为它在中断之后执行
from langgraph.types import interrupt

def human_node(state: State):
    """带验证的人类节点。"""

    answer = interrupt(question)

    return {
        "answer": answer
    }

def api_call_node(state: State):
    api_call(...) # 正确,因为它在一个单独的节点中执行

子图作为函数调用

当调用子图作为函数时,**父图**将在**调用子图的节点开始处**继续执行(并且在那里触发了interrupt)。同样,**子图**将从**调用interrupt()函数的节点开始处**继续执行。

例如,

def node_in_parent_graph(state: State):
    some_code()  # <-- 当子图恢复时,此代码将重新执行。
    # 作为函数调用一个子图。
    # 子图包含一个`interrupt`调用。
    subgraph_result = subgraph.invoke(some_input)
    ...
示例:父图和子图执行流程

假设我们有一个包含三个节点的父图:

父图node_1node_2(子图调用) → node_3

而子图也有三个节点,其中第二个节点包含一个interrupt

子图sub_node_1sub_node_2interrupt) → sub_node_3

当恢复图时,执行将按以下顺序进行:

  1. 跳过父图中的node_1(已执行,图状态保存在快照中)。
  2. **重新执行父图中的node_2**从开始处。
  3. 跳过子图中的sub_node_1(已执行,图状态保存在快照中)。
  4. **重新执行子图中的sub_node_2**从开始处。
  5. 继续执行sub_node_3及后续节点。

这里是简化示例代码,你可以使用它来理解带有中断的子图如何工作。 它计算每个节点被进入的次数并打印计数。

import uuid
from typing import TypedDict

from langgraph.graph import StateGraph
from langgraph.constants import START
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver


class State(TypedDict):
   """图的状态。"""
   state_counter: int


counter_node_in_subgraph = 0

def node_in_subgraph(state: State):
   """子图中的一个节点。"""
   global counter_node_in_subgraph
   counter_node_in_subgraph += 1  # 这段代码将**不会**再次运行!
   print(f"进入了`node_in_subgraph`共计{counter_node_in_subgraph}次")

counter_human_node = 0

def human_node(state: State):
   global counter_human_node
   counter_human_node += 1 # 这段代码将再次运行!
   print(f"进入了子图中的`human_node`共计{counter_human_node}次")
   answer = interrupt("你的名字是什么?")
   print(f"得到了答案{answer}")


checkpointer = MemorySaver()

subgraph_builder = StateGraph(State)
subgraph_builder.add_node("some_node", node_in_subgraph)
subgraph_builder.add_node("human_node", human_node)
subgraph_builder.add_edge(START, "some_node")
subgraph_builder.add_edge("some_node", "human_node")
subgraph = subgraph_builder.compile(checkpointer=checkpointer)


counter_parent_node = 0

def parent_node(state: State):
   """这个父节点将调用子图。"""
   global counter_parent_node

   counter_parent_node += 1 # 这段代码将在恢复时再次运行!
   print(f"进入了`parent_node`共计{counter_parent_node}次")

   # 请注意,我们故意增加图状态中的状态计数器
   # 以演示子图更新相同键不会与父图冲突(直到
   subgraph_state = subgraph.invoke(state)
   return subgraph_state


builder = StateGraph(State)
builder.add_node("parent_node", parent_node)
builder.add_edge(START, "parent_node")

# 中断功能需要启用检查点器!
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {
   "configurable": {
      "thread_id": uuid.uuid4(),
   }
}

for chunk in graph.stream({"state_counter": 1}, config):
   print(chunk)

print('--- 恢复 ---')

for chunk in graph.stream(Command(resume="35"), config):
   print(chunk)

这将打印出

进入了`parent_node`共计1次
进入了`node_in_subgraph`共计1次
进入了子图中的`human_node`共计1次
{'__interrupt__': (Interrupt(value='你的名字是什么?', resumable=True, ns=['parent_node:4c3a0248-21f0-1287-eacf-3002bc304db4', 'human_node:2fe86d52-6f70-2a3f-6b2f-b1eededd6348'], when='during'),)}
--- 恢复 ---
进入了`parent_node`共计2次
进入了子图中的`human_node`共计2次
得到了答案35
{'parent_node': {'state_counter': 1}}

使用多个中断

在**单个**节点内使用多个中断有助于实现类似验证人类输入的模式。然而,如果处理不当,在同一节点内使用多个中断会导致意外行为。

当一个节点包含多个中断调用时,LangGraph会维护一个特定于正在执行该节点的任务的恢复值列表。每当执行恢复时,它从节点的开头开始。对于遇到的每个中断,LangGraph会检查任务的恢复列表中是否存在匹配值。匹配是**严格基于索引的**,因此节点内部中断调用的顺序至关重要。

为了避免问题,请勿在执行之间动态更改节点结构。这包括添加、删除或重新排序中断调用,因为此类更改可能导致索引不匹配。这些问题通常源于非传统模式,例如通过Command(resume=..., update=SOME_STATE_MUTATION)命令动态修改状态或依赖全局变量动态修改节点结构。

错误代码示例
import uuid
from typing import TypedDict, Optional

from langgraph.graph import StateGraph
from langgraph.constants import START 
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver


class State(TypedDict):
    """图的状态。"""

    age: Optional[str]
    name: Optional[str]


def human_node(state: State):
    if not state.get('name'):
        name = interrupt("你的名字是什么?")
    else:
        name = "N/A"

    if not state.get('age'):
        age = interrupt("你的年龄是多少?")
    else:
        age = "N/A"

    print(f"名字:{name}。 年龄:{age}")

    return {
        "age": age,
        "name": name,
    }


builder = StateGraph(State)
builder.add_node("human_node", human_node)
builder.add_edge(START, "human_node")

# 中断功能需要启用检查点器!
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {
    "configurable": {
        "thread_id": uuid.uuid4(),
    }
}

for chunk in graph.stream({"age": None, "name": None}, config):
    print(chunk)

for chunk in graph.stream(Command(resume="John", update={"name": "foo"}), config):
    print(chunk)
{'__interrupt__': (Interrupt(value='你的名字是什么?', resumable=True, ns=['human_node:3a007ef9-c30d-c357-1ec1-86a1a70d8fba'], when='during'),)}
名字:N/A。 年龄:John
{'human_node': {'age': 'John', 'name': 'N/A'}}

额外资源 📚

Comments