Skip to content

如何添加跨线程持久性(功能API)

前提条件

本指南假设您熟悉以下概念:

LangGraph允许您在**不同的线程**之间持久化数据。例如,您可以将用户信息(如姓名或偏好)存储在一个共享(跨线程)内存中,并在新线程(如新的对话)中重复使用它们。

当使用功能API时,您可以设置它以使用Store接口来存储和检索记忆:

  1. 创建一个Store实例

    from langgraph.store.memory import InMemoryStore, BaseStore
    
    store = InMemoryStore()
    
  2. store实例传递给entrypoint()装饰器,并在函数签名中暴露store参数:

    from langgraph.func import entrypoint
    
    @entrypoint(store=store)
    def workflow(inputs: dict, store: BaseStore):
        my_task(inputs).result()
        ...
    

在此指南中,我们将展示如何构建和使用一个具有共享内存的工作流,该内存通过Store接口实现。

注意

支持本指南中使用的Store API是在LangGraph v0.2.32版本中添加的。

支持本指南中使用的Store API中的__index__和__query__参数是在LangGraph v0.2.54版本中添加的。

提示

如果您需要为StateGraph添加跨线程持久性,请参阅此操作指南

设置

首先,让我们安装所需的包并设置我们的API密钥

pip install -U langchain_anthropic langchain_openai langgraph
import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("ANTHROPIC_API_KEY")
_set_env("OPENAI_API_KEY")

设置LangSmith以开发LangGraph

注册LangSmith可以快速发现并解决您的LangGraph项目中的问题,并提高性能。LangSmith允许您使用跟踪数据来调试、测试和监控使用LangGraph构建的LLM应用程序——更多关于如何开始的信息,请参阅此处

示例:具有长期记忆的简单聊天机器人

定义存储

在这个示例中,我们将创建一个工作流,能够检索用户偏好的信息。为此,我们定义了一个 InMemoryStore 对象——它可以将数据存储在内存中,并查询这些数据。

当使用 Store 接口存储对象时,你需要定义两件事:

  • 对象的命名空间(类似于目录的元组)
  • 对象键(类似于文件名)

在我们的示例中,我们将使用 ("memories", <user_id>) 作为命名空间,并为每个新记忆使用随机 UUID 作为键。

重要的是,为了确定用户,我们将通过节点函数的配置关键字参数传递 user_id

让我们先来定义我们的存储吧!

from langgraph.store.memory import InMemoryStore
from langchain_openai import OpenAIEmbeddings

in_memory_store = InMemoryStore(
    index={
        "embed": OpenAIEmbeddings(model="text-embedding-3-small"),
        "dims": 1536,
    }
)

创建工作流

API Reference: RunnableConfig | BaseMessage | entrypoint | task | add_messages | MemorySaver

import uuid

from langchain_anthropic import ChatAnthropic
from langchain_core.runnables import RunnableConfig
from langchain_core.messages import BaseMessage
from langgraph.func import entrypoint, task
from langgraph.graph import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.base import BaseStore


model = ChatAnthropic(model="claude-3-5-sonnet-latest")


@task
def call_model(messages: list[BaseMessage], memory_store: BaseStore, user_id: str):
    namespace = ("memories", user_id)
    last_message = messages[-1]
    memories = memory_store.search(namespace, query=str(last_message.content))
    info = "\n".join([d.value["data"] for d in memories])
    system_msg = f"You are a helpful assistant talking to the user. User info: {info}"

    # Store new memories if the user asks the model to remember
    if "remember" in last_message.content.lower():
        memory = "User name is Bob"
        memory_store.put(namespace, str(uuid.uuid4()), {"data": memory})

    response = model.invoke([{"role": "system", "content": system_msg}] + messages)
    return response


# NOTE: we're passing the store object here when creating a workflow via entrypoint()
@entrypoint(checkpointer=MemorySaver(), store=in_memory_store)
def workflow(
    inputs: list[BaseMessage],
    *,
    previous: list[BaseMessage],
    config: RunnableConfig,
    store: BaseStore,
):
    user_id = config["configurable"]["user_id"]
    previous = previous or []
    inputs = add_messages(previous, inputs)
    response = call_model(inputs, store, user_id).result()
    return entrypoint.final(value=response, save=add_messages(inputs, response))

Note

如果您使用的是LangGraph Cloud或LangGraph Studio,则不需要将store传递给入口点装饰器,因为这是自动完成的。

运行工作流!

现在让我们在配置中指定一个用户ID,并告诉模型我们的名字:

config = {"configurable": {"thread_id": "1", "user_id": "1"}}
input_message = {"role": "user", "content": "Hi! Remember: my name is Bob"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()
================================== Ai Message ==================================

Hello Bob! Nice to meet you. I'll remember that your name is Bob. How can I help you today?

config = {"configurable": {"thread_id": "2", "user_id": "1"}}
input_message = {"role": "user", "content": "what is my name?"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()
================================== Ai Message ==================================

Your name is Bob.
我们现在可以检查内存中的存储,并验证确实为用户保存了记忆:

for memory in in_memory_store.search(("memories", "1")):
    print(memory.value)
{'data': 'User name is Bob'}
现在让我们为另一个用户运行工作流,以验证关于第一个用户的记忆是自包含的:

config = {"configurable": {"thread_id": "3", "user_id": "2"}}
input_message = {"role": "user", "content": "what is my name?"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()
================================== Ai Message ==================================

I don't have any information about your name. I can only see our current conversation without any prior context or personal details about you. If you'd like me to know your name, feel free to tell me!

Comments