Skip to content

如何使用 interrupt 等待用户输入

先决条件

本指南假设您熟悉以下概念:

人机交互(HIL) 互动对于代理系统至关重要。等待用户输入是常见的 HIL 交互模式,允许代理向用户提出澄清问题,并在收到用户输入之前暂停执行。

我们可以在 LangGraph 中使用interrupt() 函数来实现这一点。interrupt 允许我们在收集用户输入后停止图执行并继续执行。

环境搭建

首先我们需要安装所需的包

pip install --quiet -U langgraph langchain_anthropic

接下来,我们需要为Anthropic和/或OpenAI(我们将使用的大型语言模型)设置API密钥

import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("ANTHROPIC_API_KEY")

为LangGraph开发设置LangSmith

注册LangSmith可以快速发现并解决您的LangGraph项目中的问题,并提高其性能。LangSmith允许您使用跟踪数据来调试、测试和监控使用LangGraph构建的LLM应用程序——更多关于如何开始的信息,请参阅这里

简单用法

让我们通过一个基本的例子来探索使用人类反馈的方法。一种简单的方法是创建一个节点**human_feedback**,专门用于收集用户输入。这使我们能够在图中的特定位置收集反馈。

步骤:

  1. 在**human_feedback**节点中调用interrupt()
  2. 设置一个检查点器,以保存到该节点为止的图的状态。
  3. 使用Command(resume=...)向**human_feedback**节点提供所需的值,并恢复执行。

API Reference: StateGraph | START | END | Command | interrupt | MemorySaver

from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

from langgraph.types import Command, interrupt
from langgraph.checkpoint.memory import MemorySaver
from IPython.display import Image, display


class State(TypedDict):
    input: str
    user_feedback: str


def step_1(state):
    print("---Step 1---")
    pass


def human_feedback(state):
    print("---human_feedback---")
    feedback = interrupt("Please provide feedback:")
    return {"user_feedback": feedback}


def step_3(state):
    print("---Step 3---")
    pass


builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("human_feedback", human_feedback)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "human_feedback")
builder.add_edge("human_feedback", "step_3")
builder.add_edge("step_3", END)

# Set up memory
memory = MemorySaver()

# Add
graph = builder.compile(checkpointer=memory)

# View
display(Image(graph.get_graph().draw_mermaid_png()))

运行直到我们在human_feedback处调用interrupt()

# Input
initial_input = {"input": "hello world"}

# Thread
thread = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="updates"):
    print(event)
    print("\n")
---Step 1---
{'step_1': None}


---human_feedback---
{'__interrupt__': (Interrupt(value='Please provide feedback:', resumable=True, ns=['human_feedback:c723d73a-d2cb-32cf-452f-e147367868bd']),)}
现在,我们可以手动使用用户输入更新图形状态:

# Continue the graph execution
for event in graph.stream(
    Command(resume="go to step 3!"),
    thread,
    stream_mode="updates",
):
    print(event)
    print("\n")
---human_feedback---
{'human_feedback': {'user_feedback': 'go to step 3!'}}


---Step 3---
{'step_3': None}
我们可以看到反馈已被添加到状态中 -

graph.get_state(thread).values
{'input': 'hello world', 'user_feedback': 'go to step 3!'}

代理

代理的概念中,等待用户反馈特别有用,可以用来提出澄清问题。为了说明这一点,我们将创建一个简单的ReAct风格的代理,该代理能够进行工具调用

在这个例子中,我们将使用Anthropic的聊天模型以及一个**模拟工具**(仅用于演示目的)。

使用Pydantic与LangChain

此笔记本使用Pydantic v2 BaseModel,这要求langchain-core >= 0.3。如果使用langchain-core < 0.3,由于混合使用了Pydantic v1和v2的BaseModels,将会导致错误。

API Reference: START | tool | ToolNode | END | StateGraph | MemorySaver

# Set up the state
from langgraph.graph import MessagesState, START

# Set up the tool
# We will have one real tool - a search tool
# We'll also have one "fake" tool - a "ask_human" tool
# Here we define any ACTUAL tools
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode


@tool
def search(query: str):
    """Call to surf the web."""
    # This is a placeholder for the actual implementation
    # Don't let the LLM know this though 😊
    return f"I looked up: {query}. Result: It's sunny in San Francisco, but you better look out if you're a Gemini 😈."


tools = [search]
tool_node = ToolNode(tools)

# Set up the model
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-3-5-sonnet-latest")

from pydantic import BaseModel


# We are going "bind" all tools to the model
# We have the ACTUAL tools from above, but we also need a mock tool to ask a human
# Since `bind_tools` takes in tools but also just tool definitions,
# We can define a tool definition for `ask_human`
class AskHuman(BaseModel):
    """Ask the human a question"""

    question: str


model = model.bind_tools(tools + [AskHuman])

# Define nodes and conditional edges


# Define the function that determines whether to continue or not
def should_continue(state):
    messages = state["messages"]
    last_message = messages[-1]
    # If there is no function call, then we finish
    if not last_message.tool_calls:
        return END
    # If tool call is asking Human, we return that node
    # You could also add logic here to let some system know that there's something that requires Human input
    # For example, send a slack message, etc
    elif last_message.tool_calls[0]["name"] == "AskHuman":
        return "ask_human"
    # Otherwise if there is, we continue
    else:
        return "action"


# Define the function that calls the model
def call_model(state):
    messages = state["messages"]
    response = model.invoke(messages)
    # We return a list, because this will get added to the existing list
    return {"messages": [response]}


# We define a fake node to ask the human
def ask_human(state):
    tool_call_id = state["messages"][-1].tool_calls[0]["id"]
    ask = AskHuman.model_validate(state["messages"][-1].tool_calls[0]["args"])
    location = interrupt(ask.question)
    tool_message = [{"tool_call_id": tool_call_id, "type": "tool", "content": location}]
    return {"messages": tool_message}


# Build the graph

from langgraph.graph import END, StateGraph

# Define a new graph
workflow = StateGraph(MessagesState)

# Define the three nodes we will cycle between
workflow.add_node("agent", call_model)
workflow.add_node("action", tool_node)
workflow.add_node("ask_human", ask_human)

# Set the entrypoint as `agent`
# This means that this node is the first one called
workflow.add_edge(START, "agent")

# We now add a conditional edge
workflow.add_conditional_edges(
    # First, we define the start node. We use `agent`.
    # This means these are the edges taken after the `agent` node is called.
    "agent",
    # Next, we pass in the function that will determine which node is called next.
    should_continue,
    path_map=["ask_human", "action", END],
)

# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
workflow.add_edge("action", "agent")

# After we get back the human response, we go back to the agent
workflow.add_edge("ask_human", "agent")

# Set up memory
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()

# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
app = workflow.compile(checkpointer=memory)

display(Image(app.get_graph().draw_mermaid_png()))

与代理交互

我们现在可以与代理进行交互了。让我们让它询问用户所在的位置,然后告诉用户天气情况。

这应该会让它首先使用ask_human工具,然后再使用普通工具。

config = {"configurable": {"thread_id": "2"}}
for event in app.stream(
    {
        "messages": [
            (
                "user",
                "Ask the user where they are, then look up the weather there",
            )
        ]
    },
    config,
    stream_mode="values",
):
    if "messages" in event:
        event["messages"][-1].pretty_print()
================================ Human Message =================================

Ask the user where they are, then look up the weather there
================================== Ai Message ==================================

[{'text': "I'll help you with that. Let me first ask the user about their location.", 'type': 'text'}, {'id': 'toolu_012Z9yyZjvH8xKgMShgwpQZ9', 'input': {'question': 'Where are you located?'}, 'name': 'AskHuman', 'type': 'tool_use'}]
Tool Calls:
  AskHuman (toolu_012Z9yyZjvH8xKgMShgwpQZ9)
 Call ID: toolu_012Z9yyZjvH8xKgMShgwpQZ9
  Args:
    question: Where are you located?

app.get_state(config).next
('ask_human',)

你可以看到我们的图在ask_human节点处中断了,该节点现在正在等待提供一个location。我们可以通过调用带有Command(resume="<location>")输入的图来提供这个值:

for event in app.stream(
    Command(resume="san francisco"),
    config,
    stream_mode="values",
):
    if "messages" in event:
        event["messages"][-1].pretty_print()
================================== Ai Message ==================================

[{'text': "I'll help you with that. Let me first ask the user about their location.", 'type': 'text'}, {'id': 'toolu_012Z9yyZjvH8xKgMShgwpQZ9', 'input': {'question': 'Where are you located?'}, 'name': 'AskHuman', 'type': 'tool_use'}]
Tool Calls:
  AskHuman (toolu_012Z9yyZjvH8xKgMShgwpQZ9)
 Call ID: toolu_012Z9yyZjvH8xKgMShgwpQZ9
  Args:
    question: Where are you located?
================================= Tool Message =================================

san francisco
================================== Ai Message ==================================

[{'text': "Now I'll search for the weather in San Francisco.", 'type': 'text'}, {'id': 'toolu_01QrWBCDouvBuJPZa4veepLw', 'input': {'query': 'current weather in san francisco'}, 'name': 'search', 'type': 'tool_use'}]
Tool Calls:
  search (toolu_01QrWBCDouvBuJPZa4veepLw)
 Call ID: toolu_01QrWBCDouvBuJPZa4veepLw
  Args:
    query: current weather in san francisco
================================= Tool Message =================================
Name: search

I looked up: current weather in san francisco. Result: It's sunny in San Francisco, but you better look out if you're a Gemini 😈.
================================== Ai Message ==================================

Based on the search results, it's currently sunny in San Francisco. Would you like more specific weather details?

Comments