AI生成的LangGraph插画
在这非常详细的教程中,我们将探讨LangGraph——一个用于管理或调度大型语言模型(LLM,Làngdà Yǔyán Móxíng)复杂多步工作流的强大库——并将其应用于一个常见的电商问题:根据用户的查询决定是下单还是取消订单。通过这篇博客,您将了解如何:
- 在Python环境中安装或配置LangGraph。
- 加载并管理数据,比如库存和客户。
- 定义节点(工作流中的各个任务)。
- 构建节点和边的图结构,包括条件分支。
- 可视化并测试工作流程。
我们将一步步进行,详细解释每个概念——非常适合初学者和希望使用LLM(大型语言模型)创建动态或循环工作流程的用户。我还加入了数据集的链接,以方便大家进行实操练习。
目录:什么是 LangGraph?什么是LangGraph?
订单管理问题
导入解释
数据加载和状态定义
创建工具并集成LLM
定义工作节点
构建流程图
可视化并测试流程
结论
(LangGraph 是什么?)
LangGraph 是一个将基于图的方法引入 LangChain 工作流程的库。传统的工作流程通常是线性地从一个步骤进行到另一个步骤,但实际任务经常需要分支、条件判断,甚至包括重试失败的步骤、澄清用户输入等操作。
LangGraph的主要特性:
- 节点:单一的任务或功能(例如,检查库存,计算运输费用)。
- 边:定义节点之间数据和控制的流动情况。可以有条件地进行。
- 共享状态(Shared State):每个节点可以返回更新全局状态对象的数据,无需手动传递。
- 工具集成:轻松整合外部工具或函数,LLM可以调用这些工具或函数。
- 人机交互(可选):插入需要人工审查的节点。
在此场景中,用户的查询可能涉及新下单或取消已有订单。
- 下单:检查商品库存,计算运费,并模拟付款。
- 取消订单:提取
order_id
并将订单标记为取消。
因为我们得选择分支(决定“PlaceOrder” vs. “CancelOrder”),我们将使用LangGraph(这里指特定的流程构建工具)来创建一个条件分支流程,
- 分类处理 查询。
- 如果 下订单,转至检查库存、发货和支付情况。
- 如果 取消订单,解析出
order_id
并调用取消订单的工具。
以下是您提供的代码的第一部分,展示了导入和环境设置情况。我们还在代码后面添加了注释,解释了每个部分的内容。
# 导入所需的库文件
import os
import pandas as pd
import random
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolNode
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.runnables.graph import MermaidDrawMethod
from IPython.display import display, Image
from typing import Literal
from langchain_core.prompts import ChatPromptTemplate
from typing import Dict, TypedDict
# 设置环境变量
os.environ["OPENAI_API_KEY"] = ""
langchain_core.tools, langchain_openai, ToolNode 等 :
tool
(一个装饰器工具)将 Python 函数转换为 LLM 可以调用的工具。ChatOpenAI
是与 GPT 模型通信的客户端。ToolNode
是来自langgraph.prebuilt
的预构建节点,用于处理工具执行。StateGraph
,MessagesState
,START
,END
来自langgraph.graph
,它们对于定义我们的工作流非常重要。MermaidDrawMethod
帮助将工作流可视化为 Mermaid.js 图。
数据集链接:数据集.
在接下来的代码片段中,我们加载CSV文件(分别用于库存和客户),并将它们转换为字典。我们还定义了我们的 State 类型字典。
# 加载数据集:
inventory_df = pd.read_csv("inventory.csv")
customers_df = pd.read_csv("customers.csv")
# 将数据集转换为字典,
inventory = inventory_df.set_index("item_id").T.to_dict()
customers = customers_df.set_index("customer_id").T.to_dict()
class State(TypedDict): # 类型字典
query: str
category: str
next_node: str
item_id: str # 商品ID
order_status: str # 订单状态
cost: str
payment_status: str # 支付状态
location: str # 位置
quantity: int
CSV转字典:
inventory
和customers
是以item_id
或customer_id
为键的字典数据结构。这使得像inventory[item_51]
这样的查找很方便。
情况 :
- 一个类型化的字典,这样我们就能知道要期待哪些字段。例如,
query
,category
,item_id
等。 category
通常为“PlaceOrder”或“CancelOrder”。next_node
可以存储下一个节点的名称,尽管我们依赖图的边来进行跳转。- 这有助于跟踪所有内容——库存检查、支付状态等——在一个单一的对象内。
现在我们定义我们的LLM和工具。主要工具是cancel_order
,它利用LLM从查询中提取出order_id
。
@工具装饰器
def 取消订单(query: str) -> dict:
"""模拟订单取消"""
order_id = llm.with_structured_output(method='json_mode').invoke(f'从以下文本中提取订单ID并以JSON格式返回: {query}')['order_id']
#amount = query.get("amount")
if not order_id:
return {"错误": "缺少订单ID"}
return {"订单状态": "订单已取消成功"}
# 初始化LLM并绑定工具
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
tools_2 = [取消订单]
llm_with_tools_2 = llm.bind_tools(tools_2)
tool_node_2 = ToolNode(tools_2)
**@工具**
:
cancel_order
函数现在是当 LLM 认为需要取消订单时可以调用的工具。
提取出**order_id**
:
- 我们调用
llm.with_structured_output(method='json_mode')
来让 LLM 返回 JSON 格式。然后我们解析出order_id
。
初始化LLM :
- 选择的模型是
model="gpt-4o-mini"
,temperature=0
用于生成确定的回复。
绑定及**工具节点**
:
llm.bind_tools(tools_2)
将我们的 LLM 与cancel_order
工具绑定在一起。ToolNode
是一个专门的节点,可以自动管理这些绑定的工具。
我们现在将开始一个个定义节点。
调用模型的节点
这些节点可以访问可以调用的模型功能。
def call_model_2(state: MessagesState):
"""用来决定接下来的步骤的大型语言模型。"""
messages = state["messages"]
response = llm_with_tools_2.invoke(str(messages))
return {"messages": [response]}
def call_tools_2(state: MessagesState) -> Literal["tools_2", END]:
"""根据工具调用决定工作流走向。"""
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
return "tools_2"
return END
**call_model_2**
:接收对话 (messages
) 并将它们传递给带有绑定工具的 LLM。如果 LLM 触发了工具调用,我们会在call_tools_2
中检测到该工具调用。**call_tools_2**
:检查 LLM 是否请求了工具调用 (tool_calls
)。如果是,我们将请求转发到"tools_2"
,即ToolNode
;否则,流程结束。
在此,我们定义如下一个用于分类查询的节点:
def 处理查询(state: MessagesState) -> MessagesState:
"""将用户查询归类为PlaceOrder或CancelOrder。\n回复应仅包含'PlaceOrder'或'CancelOrder'。"""
prompt = ChatPromptTemplate.from_template(
"将用户查询归类为PlaceOrder或CancelOrder"
"请回答'PlaceOrder'或'CancelOrder' 查询内容: {state}"
)
chain = prompt | ChatOpenAI(temperature=0)
category = chain.invoke({"state": state}).content
返回 {'query': state, 'category': category}
- 此节点使用大型语言模型来处理用户的查询,返回值会设置
category
。
def 检查库存状态(state: 消息状态) -> 消息状态:
"""检查请求的项目是否在库存中。"""
item_id = llm.with_structured_output(method='JSON模式').invoke(f'从以下文本中提取 item_id,格式为 JSON: {state}')['item_id']
quantity = llm.with_structured_output(method='JSON模式').invoke(f'从以下文本中提取 quantity,格式为 JSON: {state}')['quantity']
if not item_id or not quantity:
return {"error": "缺少 'item_id' 或 'quantity'."}
if 库存.get(item_id, {}).get("stock", 0) >= quantity:
print(f"{item_id} 有货")
return {"status": "有货"}
return {"query": state, "order_status": "无货"}
- 尝试从对话中解析出
item_id
和quantity
。 - 检查库存中的
inventory[item_id]["stock"]
是否有货。
我们为计算特定顾客的物流费用定义了一个节点。
def compute_shipping(state: MessagesState) -> MessagesState:
"""计算运费。"""
item_id = llm.with_structured_output(method='json_mode').invoke(f'从如下文本中以json格式提取item_id: {state}')['item_id']
quantity = llm.with_structured_output(method='json_mode').invoke(f'从如下文本中以json格式提取quantity: {state}')['quantity']
customer_id = llm.with_structured_output(method='json_mode').invoke(f'从如下文本中以json格式提取customer_id: {state}')['customer_id']
location = customers[customer_id]['location']
if not item_id or not quantity or not location:
return {"error": "缺少 'item_id'、'quantity' 或 'location'。"}
weight_per_item = inventory[item_id]["weight"]
total_weight = weight_per_item * quantity
rates = {"local": 5, "domestic": 10, "international": 20}
cost = total_weight * rates.get(location, 10)
print(f"打印运费和位置: {cost},{location}")
return {"query": state, "cost": f"${cost:.2f}"}
- 从用户的请求中提取customer_id,然后在
customers
字典中查找他们的位置信息。 - 根据商品的重量、数量以及用户的所在位置来计算运费。
我们要定义一个处理支付的节点:
def process_payment(state: State) -> State:
"""处理支付"""
cost = llm.with_structured_output(method='json_mode').invoke(f'从以下文本中提取费用(格式为json):{state}')
if not cost:
return {"error": "缺少'amount'."}
print(f"支付处理成功: {cost},订单已下单!")
payment_outcome = random.choice(["Success", "Failed"])
return {'支付结果': payment_outcome}
-
使用
random.choice
来模拟成功的概率。 - 在实际系统中,你可能需要集成一个真实的支付网关。
我们现在来定义一个用于路由查询的节点。
def route_query_1(state: State) -> str:
"""根据查询的类别来为其分配路由."""
print(state)
if state["category"] == "PlaceOrder":
return "PlaceOrder"
elif state["category"] == "CancelOrder":
return "CancelOrder"
- 决定下一步走哪个路径:“PlaceOrder” 或 “CancelOrder”。在 LangGraph 中,我们将“PlaceOrder” 映射到检查库存节点,而“CancelOrder” 映射到取消订单节点。
以下,我们创建一个名为 StateGraph
的对象,添加节点,并定义边和条件性边,。
# 创建工作流
workflow = StateGraph(MessagesState)
# 添加节点
workflow.add_node("RouteQuery", categorize_query)
workflow.add_node("CheckInventory", check_inventory)
workflow.add_node("ComputeShipping", compute_shipping)
workflow.add_node("ProcessPayment", process_payment)
workflow.add_conditional_edges(
"RouteQuery",
route_query_1,
{
"PlaceOrder": "CheckInventory",
"CancelOrder": "CancelOrder"
}
)
workflow.add_node("CancelOrder", call_model_2)
workflow.add_node("tools_2", tool_node_2)
# 定义这些边
workflow.add_edge(开始, "RouteQuery")
workflow.add_edge("CheckInventory", "ComputeShipping")
workflow.add_edge("ComputeShipping", "ProcessPayment")
workflow.add_conditional_edges("取消订单", call_tools_2)
workflow.add_edge("工具2", "取消订单")
workflow.add_edge("ProcessPayment", 结束)
**StateGraph(消息状态图)**
:
- 我们使用
MessagesState
来存储对话数据。
节点:
RouteQuery
是识别用户意图的入口节点。- “检查库存”、“计算运费”和“处理支付”处理下单流程。
- “CancelOrder” 和 “工具_2” 处理取消订单的流程。
条件性边缘:
- 调用
workflow.add_conditional_edges("RouteQuery", route_query_1, ...)
确保在它是“PlaceOrder”时我们转到CheckInventory,如果是“CancelOrder”则转到CancelOrder。
循环:
- 当用户点击“取消订单”时,我们会检查LLM是否调用了工具 (
call_tools_2
)。如果调用了,我们就进入tools_2
(即ToolNode
);在工具调用之后,它会返回到“取消订单”,给LLM一个机会来执行进一步的操作或结束。
结束:
- “处理付款”最终导致
END
,结束了“下单”流程。
接下来的部分将工作流编译并打包成一个代理,然后将其渲染成Mermaid图,并用示例查询测试它。
# 编译工作流
agent = workflow.compile()
# 可视化工作流
mermaid_graph = agent.get_graph()
mermaid_png = mermaid_graph.draw_mermaid_png(draw_method=MermaidDrawMethod.API)
display(Image(mermaid_png))
# 查询工作流
user_query = "我想取消订单编号 223"
for chunk in agent.stream(
{"messages": [("user", user_query)]},
stream_mode="values",
):
chunk["messages"][-1].pretty_print()
auser_query = "customer_id: customer_14 : 我想订购 item_51,数量为4,且是国内订单"
for chunk in agent.stream(
{"messages": [("user", auser_query)]},
stream_mode="values",
):
chunk["messages"][-1].pretty_print()
编译:
agent = workflow.compile()
这行代码将我们的节点及边定义转换成可执行的智能代理。
想象一下:
我们有一个叫mermaid_png
的美人鱼图,可以在Jupyter笔记本中展示,用于调试或演示。
测试查询语句:
- 第一个测试用例:我希望取消订单号为 223 的订单,应被路由到
CancelOrder
。
第一次测试的结果
- 第二个测试用例:“customer_id: customer_14 : 我想要订购 item_51……”应该被分发到下单流程。
第二次测试的结果
所以,结论是:利用LangGraph,我们创建了一个根据用户意图动态调整的分支工作流程。根据用户的意图,工作流程会自动下单或取消订单。我们现场演示了:
- 如何使用LLM节点(
categorize_query
)分类查询。 - 如何绑定工具函数(
cancel_order
)并将其集成到工作流程中。 - 如何通过独立的节点检查库存、计算运费以及处理付款。
- 用Mermaid.js可视化整个工作流程。
这种方法很灵活,可以扩展:你可以根据需要添加更多步骤,比如地址验证和促销码,或者增加新的分支,例如更新现有订单,而无需重写单一的脚本。如果你需要重试失败的支付或验证用户确认,可以使用循环。LangGraph也可以处理这些需求。