Skip to content

LangGraph 的运行时(Pregel)

Pregel 实现了 LangGraph 的运行时,管理 LangGraph 应用程序的执行。

编译一个状态图或创建一个入口点会产生一个可以使用输入调用的Pregel实例。

本指南从高层次上解释了运行时,并提供了直接使用 Pregel 实现应用程序的说明。

**注意:**LangGraph 的 Pregel 运行时以 Google 的 Pregel 算法 命名,该算法描述了一种使用图形进行大规模并行计算的有效方法。

概述

在LangGraph中,Pregel将actor模型和**通道**结合到一个应用程序中。actor**从通道读取数据并写入数据到通道。Pregel按照**Pregel算法/**大规模并行计算**模型组织应用程序的执行步骤。

每个步骤包括三个阶段:

  • 计划:确定本步骤要执行哪些**actor**。例如,在第一步中选择订阅特殊**输入**通道的**actor**;在后续步骤中选择订阅前一步更新的通道的**actor**。
  • 执行:并行执行所有选定的**actor**,直到所有**actor**完成,或者有一个失败,或者达到超时时间。在此阶段,通道更新对**actor**不可见,直到下一次步骤开始。
  • 更新:使用本步骤中**actor**写入的值来更新通道。

重复此过程,直到没有**actor**被选中执行,或者达到了最大步骤数。

活动者

一个**活动者**是PregelNode。它订阅通道,从通道中读取数据,并向通道写入数据。可以将其视为Pregel算法中的一个**活动者**。PregelNodes实现了LangChain的Runnable接口。

通道

通道用于在各个角色(PregelNodes)之间通信。每个通道都有一个值类型、一个更新类型以及一个更新函数——该函数接收一系列更新并修改存储的值。通道可以用于从一个链向另一个链发送数据,或者将数据从一个链发送到其自身在未来步骤中的状态。LangGraph提供了一些内置通道:

基本通道:LastValue 和 Topic

  • LastValue:默认通道,存储最后一次发送到通道的值,适用于输入和输出值,或用于从一步传递数据到下一步。
  • Topic:可配置的PubSub主题,适用于在**角色**之间发送多个值,或用于累积输出。可以配置为去重值或在多步中累积值。

高级通道:Context 和 BinaryOperatorAggregate

  • Context:暴露上下文管理器的值,并管理其生命周期。适用于访问需要初始化和/或清理的外部资源;例如,client = Context(httpx.Client)
  • BinaryOperatorAggregate:存储持久化的值,通过将二元操作符应用于当前值和发送到通道的每次更新来更新。适用于在多步中计算聚合值;例如,total = BinaryOperatorAggregate(int, operator.add)

示例

虽然大多数用户会通过StateGraph API或入口点装饰器与Pregel交互,但也可以直接与Pregel进行交互。

以下是几个不同的示例,以帮助您了解Pregel API。

from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, Channel 

node1 = (
    Channel.subscribe_to("a")
    | (lambda x: x + x)
    | Channel.write_to("b")
)

app = Pregel(
    nodes={"node1": node1},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b"],
)

app.invoke({"a": "foo"})
{'b': 'foofoo'}
from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, Channel 

node1 = (
    Channel.subscribe_to("a")
    | (lambda x: x + x)
    | Channel.write_to("b")
)

node2 = (
    Channel.subscribe_to("b")
    | (lambda x: x + x)
    | Channel.write_to("c")
)


app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": LastValue(str),
        "c": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b", "c"],
)

app.invoke({"a": "foo"})
{'b': 'foofoo', 'c': 'foofoofoofoo'}
from langgraph.channels import EphemeralValue, Topic
from langgraph.pregel import Pregel, Channel 

node1 = (
    Channel.subscribe_to("a")
    | (lambda x: x + x)
    | {
        "b": Channel.write_to("b"),
        "c": Channel.write_to("c")
    }
)

node2 = (
    Channel.subscribe_to("b")
    | (lambda x: x + x)
    | {
        "c": Channel.write_to("c"),
    }
)

app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
        "c": Topic(str, accumulate=True),
    },
    input_channels=["a"],
    output_channels=["c"],
)

app.invoke({"a": "foo"})
{'c': ['foofoo', 'foofoofoofoo']}

此示例演示了如何使用二元操作聚合通道来实现一个归约器。

from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, Channel


node1 = (
    Channel.subscribe_to("a")
    | (lambda x: x + x)
    | {
        "b": Channel.write_to("b"),
        "c": Channel.write_to("c")
    }
)

node2 = (
    Channel.subscribe_to("b")
    | (lambda x: x + x)
    | {
        "c": Channel.write_to("c"),
    }
)

def reducer(current, update):
    if current:
        return current + " | " + "update"
    else:
        return update

app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
        "c": BinaryOperatorAggregate(str, operator=reducer),
    },
    input_channels=["a"],
    output_channels=["c"],
)

app.invoke({"a": "foo"})

此示例演示了如何在图中引入循环,通过让一条链写入它订阅的通道来实现。执行将继续,直到通道中写入None值为止。

from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, Channel, ChannelWrite, ChannelWriteEntry

example_node = (
    Channel.subscribe_to("value")
    | (lambda x: x + x if len(x) < 10 else None)
    | ChannelWrite(writes=[ChannelWriteEntry(channel="value", skip_none=True)])
)

app = Pregel(
    nodes={"example_node": example_node},
    channels={
        "value": EphemeralValue(str),
    },
    input_channels=["value"],
    output_channels=["value"],
)

app.invoke({"value": "a"})
{'value': 'aaaaaaaaaaaaaaaa'}

高级API

LangGraph提供了两种高级API来创建Pregel应用程序:状态图(图API)函数式API

状态图(图API)是一种更高层次的抽象,简化了Pregel应用程序的创建过程。它允许您定义一个节点和边组成的图。当编译该图时,状态图API会自动为您创建Pregel应用程序。

from typing import TypedDict, Optional

from langgraph.constants import START
from langgraph.graph import StateGraph

class Essay(TypedDict):
    topic: str
    content: Optional[str]
    score: Optional[float]

def write_essay(essay: Essay):
    return {
        "content": f"关于{essay['topic']}的文章",
    }

def score_essay(essay: Essay):
    return {
        "score": 10
    }

builder = StateGraph(Essay)
builder.add_node(write_essay)
builder.add_node(score_essay)
builder.add_edge(START, "write_essay")

# 编译图。
# 这将返回一个Pregel实例。
graph = builder.compile()

编译后的Pregel实例将与一组节点和通道相关联。您可以打印这些节点和通道以查看它们的内容。

print(graph.nodes)

您将看到类似这样的输出:

{'__start__': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1810>,
 'write_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba14d0>,
 'score_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1710>}
print(graph.channels)

您应该看到类似这样的输出:

{'topic': <langgraph.channels.last_value.LastValue at 0x7d05e3294d80>,
 'content': <langgraph.channels.last_value.LastValue at 0x7d05e3295040>,
 'score': <langgraph.channels.last_value.LastValue at 0x7d05e3295980>,
 '__start__': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3297e00>,
 'write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32960c0>,
 'score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ab80>,
 'branch:__start__:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32941c0>,
 'branch:__start__:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d88800>,
 'branch:write_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3295ec0>,
 'branch:write_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ac00>,
 'branch:score_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d89700>,
 'branch:score_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b400>,
 'start:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b280>}

函数式API中,您可以使用一个入口点来创建 一个Pregel应用程序。入口点装饰器允许您定义一个接受输入并返回输出的函数。

from typing import TypedDict, Optional

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint

class Essay(TypedDict):
    topic: str
    content: Optional[str]
    score: Optional[float]


checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def write_essay(essay: Essay):
    return {
        "content": f"关于{essay['topic']}的文章",
    }

print("节点:")
print(write_essay.nodes)
print("通道:")
print(write_essay.channels)
节点:
{'write_essay': <langgraph.pregel.read.PregelNode object at 0x7d05e2f9aad0>}
通道:
{'__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7d05e2c906c0>, '__end__': <langgraph.channels.last_value.LastValue object at 0x7d05e2c90c40>, '__previous__': <langgraph.channels.last_value.LastValue object at 0x7d05e1007280>}

Comments