Skip to content

流式处理

流式处理是构建响应式应用程序的关键。有几种类型的数据您希望进行流式处理:

  1. 代理进度 — 在代理图中的每个节点执行后获取更新。
  2. 语言模型标记 — 在语言模型生成标记时进行流式传输。
  3. 自定义更新 — 在工具执行期间发出自定义数据(例如,“已获取10/100条记录”)

您可以同时流式处理多种类型的数据

image

等待是鸽子的事。

代理进度

要流式传输代理进度,请使用stream()astream()方法,并设置stream_mode="updates"(langchain-ai.github.io/langgraph/how-tos/streaming/#updates)。这将在每个代理步骤后发出一个事件。

例如,如果你有一个只调用一次工具的代理,你应该看到以下更新:

  • LLM节点:包含工具调用请求的人工智能消息
  • 工具节点:包含执行结果的工具消息
  • LLM节点:最终的人工智能响应
agent = create_react_agent(
    model="anthropic:claude-3-7-sonnet-latest",
    tools=[get_weather],
)
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "what is the weather in sf"}]},
    stream_mode="updates"
):
    print(chunk)
    print("\n")
agent = create_react_agent(
    model="anthropic:claude-3-7-sonnet-latest",
    tools=[get_weather],
)
async for chunk in agent.astream(
    {"messages": [{"role": "user", "content": "what is the weather in sf"}]},
    stream_mode="updates"
):
    print(chunk)
    print("\n")

大型语言模型(LLM)令牌

要实时流式传输由LLM生成的令牌,请使用stream_mode="messages"

agent = create_react_agent(
    model="anthropic:claude-3-7-sonnet-latest",
    tools=[get_weather],
)
for token, metadata in agent.stream(
    {"messages": [{"role": "user", "content": "旧金山的天气如何"}]},
    stream_mode="messages"
):
    print("Token", token)
    print("Metadata", metadata)
    print("\n")
agent = create_react_agent(
    model="anthropic:claude-3-7-sonnet-latest",
    tools=[get_weather],
)
async for token, metadata in agent.astream(
    {"messages": [{"role": "user", "content": "旧金山的天气如何"}]},
    stream_mode="messages"
):
    print("Token", token)
    print("Metadata", metadata)
    print("\n")

工具更新

要从正在执行的工具中流式传输更新,您可以使用get_stream_writer

from langgraph.config import get_stream_writer

def get_weather(city: str) -> str:
    """获取给定城市的天气信息。"""
    writer = get_stream_writer()
    # 流式传输任意数据
    writer(f"查询城市:{city}的数据")
    return f"{city}永远阳光明媚!"

agent = create_react_agent(
    model="anthropic:claude-3-7-sonnet-latest",
    tools=[get_weather],
)

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "what is the weather in sf"}]},
    stream_mode="custom"
):
    print(chunk)
    print("\n")
from langgraph.config import get_stream_writer

def get_weather(city: str) -> str:
    """获取给定城市的天气信息。"""
    writer = get_stream_writer()
    # 流式传输任意数据
    writer(f"查询城市:{city}的数据")
    return f"{city}永远阳光明媚!"

agent = create_react_agent(
    model="anthropic:claude-3-7-sonnet-latest",
    tools=[get_weather],
)

async for chunk in agent.astream(
    {"messages": [{"role": "user", "content": "what is the weather in sf"}]},
    stream_mode="custom"
):
    print(chunk)
    print("\n")

注意事项

如果您在工具内部添加了get_stream_writer,则无法在LangGraph执行上下文之外调用该工具。

同时流式传输多种模式

可以通过传递一个包含多种流式传输模式的列表来指定多种流式传输模式:stream_mode=["updates", "messages", "custom"]

agent = create_react_agent(
    model="anthropic:claude-3-7-sonnet-latest",
    tools=[get_weather],
)

for stream_mode, chunk in agent.stream(
    {"messages": [{"role": "user", "content": "what is the weather in sf"}]},
    stream_mode=["updates", "messages", "custom"]
):
    print(chunk)
    print("\n")
agent = create_react_agent(
    model="anthropic:claude-3-7-sonnet-latest",
    tools=[get_weather],
)

async for stream_mode, chunk in agent.astream(
    {"messages": [{"role": "user", "content": "what is the weather in sf"}]},
    stream_mode=["updates", "messages", "custom"]
):
    print(chunk)
    print("\n")

额外资源

Comments