希望能够在这篇文章中综合讨论搭建代理的各种方案,这里的代理定义并不局限于能力边界要求,而是从普通的工作流一直到大规模skill/框架搭建出的Agent。一个合理的Agent不应该盲目的增加复杂度,而应该根据任务的场景、ROI等等因素来衡量方案

工作流

对于工作流,市面上有很多成熟的框架例如n8n coze Dify等等,目前我只用过n8n,所做的工作也比较简单。主要原因是我的场景中并没有太多完全流程化的东西,而工作流的方案往往更适合与这些。

它的基本步骤就是首先要提出一个SOP的流程,然后选一个基模,我认为这里尤其应该注意的是,Deepseek是没有多模态能力的,而是通过OCR来实现图片识别,在搭建工作流时应该权衡经济和多模态能力,是否选择Deepseek。而搭建一个工作流的麻烦点就是各个节点的配置,举一个基础的例子就是搭建邮件工作流,如果是谷歌邮箱,其中的Oauth流程还是挺复杂的。有一个SOP流程+节点配置好,实际上一个工作流就不难搭建了。而对于不同的任务,需要在以上提到的框架中多找找,可能某个框架针对这个任务场景设计的节点要更加优雅和安全。

同时大多数工作流工具主要是DAG,虽然支持Loop,但是远远没有代码灵活。

SDK实现

对于RawSDK 搭配上一个BaseURL+API就能够实现一个Agent了,相比于一个重量级的框架(例如Langchain系列),如果搭建一个自由便捷,或者项目刚刚开始i,要开发一个MVP版本,使用SDK都是一个很好的选择,而且OpenAISDK也提供里越来越多的功能。下面是使用SDK+一个Loop循环实现的一个简单的代理。基本思路就是定义tool->定义schema->调用SDK绑定Tool以及生成Response->进入Loop->给出最终答案并退出

一个简单案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import json
from openai import OpenAI

MAXSTEP = 10

# --- 1. 工具定义 ---
def start_stream(stream_id: str, protocol: str):
if protocol not in ['rtmp', 'hls']:
return json.dumps({"error": "Unsupported protocol"})
else:
return json.dumps({"stream_id": stream_id, "protocol": protocol, "status": "starting"})

def stop_stream(stream_id: str):
return json.dumps({"msg": f"stream_{stream_id} has been stopped."})

def get_stream_status(stream_id: str):
if stream_id == "news_channel":
return json.dumps({"bitrate": 500, "status": "unstable", "fps": 10})
elif stream_id == "sports_channel":
return json.dumps({"bitrate": 4000, "status": "live", "fps": 60})
else:
return json.dumps({"error": "offline"})

# --- 2. Schema 定义 ---
tools_schema = [
{
"type": "function",
"function": {
"name": "start_stream",
"description": "Start a stream with given stream_id and protocol.",
"parameters": {
"type": "object",
"properties": {
"stream_id": {"type": "string"},
"protocol": {"type": "string", "enum": ["rtmp", "hls"]}
},
"required": ["stream_id", "protocol"]
}
}
},
{
"type": "function",
"function": {
"name": "stop_stream",
"description": "Stop a running stream.",
"parameters": {
"type": "object",
"properties": {
"stream_id": {"type": "string"}
},
"required": ["stream_id"]
}
}
},
{
"type": "function",
"function": {
"name": "get_stream_status",
"description": "Get technical status (bitrate, fps) of a stream.",
"parameters": {
"type": "object",
"properties": {
"stream_id": {"type": "string"}
},
"required": ["stream_id"]
}
}
}
]

available_tools = {
"start_stream": start_stream,
"stop_stream": stop_stream,
"get_stream_status": get_stream_status
}

class StreamOpsAgent:
def __init__(self):
self.client = OpenAI(
api_key=
base_url=
)
self.max_steps = MAXSTEP

def run(self, user_input):

messages = [{"role": "user", "content": user_input}]
print(f"UserInput: {user_input}")

step_count = 0
while step_count < self.max_steps:
step_count += 1

response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
tools=tools_schema,
tool_choice="auto"
)
response_message = response.choices[0].message
tool_calls = response_message.tool_calls

# 情况 A: 模型决定调用工具
if tool_calls:

messages.append(response_message)

for tool_call in tool_calls:
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
tool_call_id = tool_call.id

print(f" -> [Tool Call] {function_name} Args: {function_args}")

if function_name in available_tools:
tool_function = available_tools[function_name]
try:

tool_output = tool_function(**function_args)
except Exception as e:
tool_output = json.dumps({"error": str(e)})
else:
tool_output = json.dumps({"error": "Tool not found"})


if not isinstance(tool_output, str):
tool_output = json.dumps(tool_output)

messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"name": function_name,
"content": tool_output
})

# 情况 B: 模型没有调用工具,给出了最终回复
else:
print("Final Response Generated.")
return response_message.content

return "Error: Maximum steps exceeded."

if __name__ == "__main__":
agent = StreamOpsAgent()

print("\n--- Test Case 1 ---")
user_query1 = "帮我用 RTMP 协议启动一个叫 test_live 的流。"
result = agent.run(user_query1)
print("Agent Result:", result)

print("\n--- Test Case 2 (Complex) ---")
user_query2 = "检查一下 news_channel 的状态,如果它不稳定(unstable)或者码率低于 1000,就把它重启一下."
result2 = agent.run(user_query2)
print("Agent Result:", result2)

需要注意的是对于SDK调用有三个阶段,

  1. 首先是很原生的使用Prompt+Regex:在 Prompt 里写死:“如果是要查询天气,请回复 Action: get_weather”,然后通过正则解析得到Tool调用->极不稳定
  2. Functions API:使用Function+Function_call来对已经微调过的模型进行调用,尽管模型会针对函数专门生成json便于解析,但是这种方式不支持并行,其接口是在response中定义

    functions=tools,
    function_call=”auto”
    这种方式也已经被废弃。

  3. 最后就是使用tools和tool_choice的接口:可以实现并行调用,模型会返回一个tool_call数组,可以对调用进行并行处理。上面代码中使用的就是Tool接口

SDK的其他API

结构化输出

OpenAI的SDK提供了beta.chat.completions.parse 接口,配合 Pydantic 可以实现零解析错误的强类型输出。举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from pydantic import BaseModel
from enum import Enum

# 定义诊断结果的 Schema
class Severity(str, Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"

class LogDiagnosis(BaseModel):
error_code: str
severity: Severity
suggested_command: str
reasoning: str

ffmpeg_log = "Error: Connection timed out. RTMP server not reachable at rtmp://192.168.1.5/live"

completion = client.beta.chat.completions.parse(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是音视频专家,请分析日志并给出结构化诊断。"},
{"role": "user", "content": ffmpeg_log},
],
response_format=LogDiagnosis,
)

diagnosis = completion.choices[0].message.parsed

# 直接作为对象调用,无需正则或json.loads
print(f"Severity: {diagnosis.severity}")
print(f"Fix: {diagnosis.suggested_command}")

流式响应

为了优化体验,首字响应时间很重要,而决定这个因素一方面是API调用延迟,另一方面是Agent处理的延时,这里主要考虑使用SDK来处理agent的延时。SDK提供了stream = true参数,在处理工具调用时需要进行拼接chunck,但是回复生成阶段带来的体验提升是很大的。

1
2
3
4
5
6
7
8
9
10
11
response_stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "请解释一下 HLS 和 RTMP 协议的区别"}],
stream=True, # 开启流式
)

print(Agent: , end="", flush=True)
for chunk in response_stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)

多模态能力

可以通过添加image_url参数来配置图片接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 假设我们要让 Agent 检查直播画面是否黑屏或有伪影
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "这张是推流端的截图,画面正常吗?"},
{
"type": "image_url",
"image_url": {
"url": "[https://example.com/stream_snapshot.jpg](https://example.com/stream_snapshot.jpg)",
"detail": "low" # low 模式更省 token,适合简单判断
}
}
],
}
],
)
print(response.choices[0].message.content)

LangChain/LangGraph

当然了框架肯定不仅限于LAngchain团队开发的,还有AutoGen、CrewAI、smolagents、OpenAI Swarm 和 OpenManus等等。只是LAngChain的接口更加通用,并且目前我的应用场景还没有溢出LangChain的能力。但还是综合网上文章讨论一下几种框架的不同,传统LangChain使用ReAct,LangGraph使用图,具体在后面说。AutoGen似乎对subAgent的支持更好,因为提供了一个异步消息机制,这更有利于多个agent的通信。OpenManus 并未采用端到端的模型微调方案,而是强调 上下文驱动(In-Context Learning) 的 Agent 架构设计。其核心思想是充分利用先进大模型已有的推理与泛化能力,通过提示词工程、上下文组织和多轮交互,让 Agent 快速适配新任务

相较于Raw SDK, LangChain框架统一了接口,提供了持久化存储,包括checkpoint和threadID接口,能够实现历史回溯。但是Langchain的封装太高。例如使用creatAgent一个接口完成了整个agent创建,因此这里解释更加自由的Langgraph。SDK 中的 while 循环虽然简单,但难以持久化。LangGraph 将流程定义为 Nodes(节点/动作) 和 Edges(边/逻辑跳转)。具体可以看一下文档,Langchaint团队还开发了很多Agent的接口特性。下面聊一些目前我用到的,例如Human in the loop等。

下面是一段使用langGraph改造的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import json
import operator
from typing import Annotated, TypedDict, Union, List

from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
from langgraph.graph import StateGraph, END, START
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver


@tool
def start_stream(stream_id: str, protocol: str):
"""Start a stream with given stream_id and protocol (rtmp/hls only)."""
if protocol not in ['rtmp', 'hls']:
return json.dumps({"error": "Unsupported protocol"})
return json.dumps({"stream_id": stream_id, "protocol": protocol, "status": "starting"})

@tool
def stop_stream(stream_id: str):
"""Stop a running stream."""
return json.dumps({"msg": f"stream_{stream_id} has been stopped."})

@tool
def get_stream_status(stream_id: str):
"""Get technical status (bitrate, fps) of a stream."""
if stream_id == "news_channel":
return json.dumps({"bitrate": 500, "status": "unstable", "fps": 10})
elif stream_id == "sports_channel":
return json.dumps({"bitrate": 4000, "status": "live", "fps": 60})
else:
return json.dumps({"error": "offline"})

tools = [start_stream, stop_stream, get_stream_status]


# Annotated[list, operator.add]:新的消息会 "Append" 到列表,而不是覆盖
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], operator.add]


llm = ChatOpenAI(
model="gpt-4o-mini",
api_key=,
base_url=
)
llm_with_tools = llm.bind_tools(tools)

def agent_node(state: AgentState):
"""思考节点:调用 LLM 决定下一步"""
messages = state["messages"]
response = llm_with_tools.invoke(messages)
# 返回新的消息(LangGraph 会自动将其 append 到 state)
return {"messages": [response]}


tool_node = ToolNode(tools)

# 定义图
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)

# 定义边 (Edges)
workflow.add_edge(START, "agent") # 入口 -> 思考

# 定义条件边 (Conditional Edges)
# 逻辑:如果 agent 输出包含 tool_calls,跳转到 "tools" 节点,否则结束
def should_continue(state: AgentState):
last_message = state["messages"][-1]
if last_message.tool_calls:
return "tools"
return END

workflow.add_conditional_edges(
"agent",
should_continue,
["tools", END]
)

# 工具执行完后,必须跳回 agent 继续思考
workflow.add_edge("tools", "agent")

# memory=checkpointer 开启了持久化记忆
checkpointer = MemorySaver()
app = workflow.compile(checkpointer=checkpointer)

# 模拟运行
if __name__ == "__main__":
# 配置线程 ID,模拟不同用户的会话
config = {"configurable": {"thread_id": "user_session_001"}}

print("--- 任务:复杂流控推理 ---")
user_input = "检查一下 news_channel 的状态,如果它不稳定或者码率低于 1000,就把它重启一下。"

# 这一步包含了整个 While 循环 + 状态管理
# Stream 模式允许我们看到中间步骤
for event in app.stream(
{"messages": [HumanMessage(content=user_input)]},
config=config
):
for key, value in event.items():
print(f"\n[Node: {key}]")
# 打印最新的消息内容
last_msg = value["messages"][-1]
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
print(f" -> 计划调用: {last_msg.tool_calls[0]['name']}")
elif hasattr(last_msg, "content"):
print(f" -> 内容: {last_msg.content}")

print("\n\n====== 最终结果 ======")
# 由于有 checkpointer,我们可以随时获取当前状态
final_state = app.get_state(config)
print(final_state.values["messages"][-1].content)

一些API

human in the loop

通过 interrupt_before 或 interrupt_after 来暂停图的执行,等待人工确认或修改状态。实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# ... (前面的代码保持不变) ...

# 1. 修改编译步骤:增加 interrupt_before
# 这意味着在进入 "tools" 节点前,程序会暂停
app = workflow.compile(
checkpointer=checkpointer,
interrupt_before=["tools"] # <--- 关键修改
)

if __name__ == "__main__":
config = {"configurable": {"thread_id": "user_session_hitl_01"}}

print("--- 阶段1:触发任务 ---")
user_input = "news_channel 好像卡了,帮我重启一下。"

# 第一次运行:Agent 会思考并决定调用工具,但会在执行前暂停
for event in app.stream({"messages": [HumanMessage(content=user_input)]}, config=config):
for key, value in event.items():
print(f"[Node: {key}] 正在处理...")

# 此时,程序暂停了。我们可以检查 Agent 到底想干什么
snapshot = app.get_state(config)
next_step = snapshot.next

if "tools" in next_step:
last_msg = snapshot.values["messages"][-1]
tool_call = last_msg.tool_calls[0]
print(f"\n[系统暂停] Agent 申请执行: {tool_call['name']} 参数: {tool_call['args']}")

# 模拟人工审批
approval = input(">>> 是否批准执行?(y/n): ")

if approval.lower() == 'y':
print("--- 阶段2:批准并继续 ---")
# 传入 None 表示继续执行当前状态,不添加新消息
for event in app.stream(None, config=config):
for key, value in event.items():
print(f"[Node: {key}] {value}")
else:
print("--- 阶段2:拒绝并告知 Agent ---")
# 我们可以直接注入一条 ToolMessage 表示拒绝,或者注入一条 HumanMessage 告诉它不行
app.update_state(
config,
{"messages": [HumanMessage(content="管理员拒绝了重启操作,请检查是否有其他非破坏性方案。")]}
)
# 继续运行,Agent 会收到拒绝信息并重新思考
for event in app.stream(None, config=config):
print(f"[Node: {key}] {value}")

checkpoint与threadID

在Langgraph中通过快照策略将每个操作保存为一个checkpoint,将每一个会话保存为一个threadID,可以通过数据库以及序列化保存这个快照,并设计逻辑实现查找历史会话或者回溯历史checkpoint。而当返回某个checkpoint进行新的操作时,会被分支到一个新的checkpoints,而不是直接覆盖。关于threadID和checkpoint的过滤、排序等策略,都可以在数据库中实现。

subagent

在 LangGraph 中实现层级 Agent 的思路,可以定义另一个 StateGraph (比如 stream_monitor_graph),编译成 monitor_app,然后把它作为一个 Node 放入你的主图 workflow 中。

上下文压缩

虽然 operator.add 导致了状态的无限增长,但 LangGraph 并非束手无策。对于简单的场景,我们在 Node 内部使用 trim_messages 进行基于 Token 的动态截断;而对于需要长期记忆的场景,我们可以设计一个专门的 SummarizeNode,利用 LangGraph 特有的 RemoveMessage 接口,定期对 State 进行上下文压缩。
一个简单的sumarizeNode设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from langchain_core.messages import RemoveMessage, SystemMessage, HumanMessage

def summarize_node(state: AgentState):
summary_prompt = "请总结以上对话的关键信息..."
messages = state["messages"]

# 调用 LLM 生成总结
summary = llm.invoke(messages + [HumanMessage(content=summary_prompt)])

# 找出除了最新的 2 条消息以外的所有旧消息 ID
# 我们保留最后 2 条是为了上下文连贯性
messages_to_delete = [m.id for m in messages[:-2]]

# 构造删除操作
delete_operations = [RemoveMessage(id=mid) for mid in messages_to_delete]

# 将总结作为新的 SystemMessage 插入(或者作为一段上下文)
new_summary_msg = SystemMessage(content=f"Background Summary: {summary.content}")

# 返回:删除旧的 + 插入新的总结
return {"messages": [new_summary_msg] + delete_operations}

# 需要在 workflow 中添加条件边
def should_summarize(state: AgentState):
if len(state["messages"]) > 10:
return "summarize"
return END # 或者继续其他逻辑

或者使用官方的trim工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from langchain_core.messages import trim_messages

# 定义裁剪器
trimmer = trim_messages(
max_tokens=2000,
strategy="last",
token_counter=llm, # 使用模型的 tokenizer 计数
include_system=True, # 总是保留 System Prompt
allow_partial=False,
start_on="human" # 确保截断后的第一条是人类消息(避免从中间的 tool 切断)
)

def agent_node(state: AgentState):
# 在调用 LLM 前先过一遍 trimmer
trimmed_messages = trimmer.invoke(state["messages"])
response = llm_with_tools.invoke(trimmed_messages)
return {"messages": [response]}

Skill策略

以上都是使用tool_call设计的Agent架构,同样的skill也是基于toolcall,只是不同于ReAct(推理+调用),skill通过分层设计skill.md实现了一种渐进式加载的策略,这可以非常明显的避免上下文溢出,同时skill的集成也很简单,只需要添加skill.md即可实现热重载。关于一个简单的skill.md设计,以及其集成见下面实例。关于skill设计可以看官方博客

加载策略

首先构建一个Langgraph节点用来加载skill集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import os
import yaml
from typing import Annotated, TypedDict, List
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END, START
from langchain_core.messages import BaseMessage, SystemMessage
from langgraph.prebuilt import ToolNode
import operator

# --- 1. 模拟动态加载器 (核心所在) ---
# 这个类每次调用都会去读取文件,实现了 "Hot Reload"
class SkillRegistry:
def __init__(self, skill_dir="./skills", tool_map=None):
self.skill_dir = skill_dir
self.tool_map = tool_map or {}

def get_all_skill_names(self):
"""每次调用都重新扫描目录,实现动态发现"""
return [f.replace('.md', '') for f in os.listdir(self.skill_dir) if f.endswith('.md')]

def load_skill_config(self, skill_name):
"""实时读取文件内容"""
path = os.path.join(self.skill_dir, f"{skill_name}.md")
if not os.path.exists(path):
return None # 或者加载默认通用的 skill

with open(path, 'r', encoding='utf-8') as f:
# 简单解析 Frontmatter (略去复杂正则,假设格式标准)
content = f.read()
parts = content.split('---', 2)
meta = yaml.safe_load(parts[1])
prompt = parts[2].strip()

# 动态筛选工具
tools = [self.tool_map[t] for t in meta.get('tools', []) if t in self.tool_map]
return {"prompt": prompt, "tools": tools}

# --- 2. 定义 State ---
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], operator.add]
current_skill: str # 关键:用这个变量控制加载什么技能

# --- 3. 节点定义 ---

# 实例化加载器
registry = SkillRegistry(tool_map=all_tools_map)

def router_node(state: AgentState):
"""
路由节点:根据用户意图选择技能。
注意:这里每次都获取最新的技能列表。
"""
available_skills = registry.get_all_skill_names()
messages = state["messages"]

# 简单的路由逻辑,实际可以使用 LLM 进行分类
# 为了演示,我们用简单的规则,或者让 LLM 从 available_skills 中选一个
system_prompt = f"你是路由助手。当前可用技能:{available_skills}。请根据用户输入返回最合适的技能名称,只返回名称。"

# 这里用一个小模型做路由
router_llm = ChatOpenAI(model="gpt-4o-mini")
response = router_llm.invoke([SystemMessage(content=system_prompt)] + messages[-1:])

chosen_skill = response.content.strip()
if chosen_skill not in available_skills:
chosen_skill = "general_chat" # 默认技能

print(f"🔄 Router: 动态路由到 -> {chosen_skill}")
return {"current_skill": chosen_skill}

def skill_executor_node(state: AgentState):
"""
通用执行节点:根据 current_skill 变成不同的样子
"""
skill_name = state["current_skill"]

# === 关键点:热加载发生在这里 ===
# 每次请求进入此节点,都会重新读取 .md 文件
# 如果你刚修改了 .md 的 prompt,这里立即生效
config = registry.load_skill_config(skill_name)

if not config:
return {"messages": [BaseMessage(content="Skill configuration error.")]}

# 动态构建针对当前 Skill 的 LLM
llm = ChatOpenAI(model="gpt-4o")

# 1. 替换 System Prompt
system_msg = SystemMessage(content=config["prompt"])

# 2. 绑定特定的 Tools
llm_with_tools = llm.bind_tools(config["tools"])

# 3. 构造特定的 Message 历史(确保 System Prompt 是最新的)
# 这里的逻辑可以优化,比如把旧的 SystemMessage 替换掉
messages = [system_msg] + state["messages"]

response = llm_with_tools.invoke(messages)
return {"messages": [response]}

# --- 4. 构建图 (结构是固定的,但能力是动态的) ---
workflow = StateGraph(AgentState)

workflow.add_node("router", router_node)
workflow.add_node("skill_agent", skill_executor_node)
workflow.add_node("tools", ToolNode(list(all_tools_map.values()))) # ToolNode 包含全量工具即可,LLM 只会看到部分

workflow.add_edge(START, "router")
workflow.add_edge("router", "skill_agent")

def should_continue(state: AgentState):
last_msg = state["messages"][-1]
if last_msg.tool_calls:
return "tools"
return END

workflow.add_conditional_edges("skill_agent", should_continue, ["tools", END])
workflow.add_edge("tools", "skill_agent") # 工具执行完回到 skill_agent 继续推理

app = workflow.compile()

skill.md设计

linuxdo上有一篇文章写的很好,以下是一个案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
---
# === 元数据区 (给系统/Router看) ===
name: stream_diagnosis_expert
version: 1.0.2
author: ops_team
description: >-
专门用于诊断直播流卡顿、音画不同步、黑屏等质量问题。
当用户反馈观看体验不佳、报错或请求技术排查时,激活此技能。

# === 配置区 (给执行引擎看) ===
model_config:
temperature: 0.1 # 诊断类任务需要低创造性
max_tokens: 2000

# === 工具绑定区 (给 ToolBinder 看) ===
# 这里填写 Python 函数的字符串名称,加载时映射到真实函数,
tools:
- get_stream_probe_data # ffprobe 封装 see [get_stream_probe_data](get_stream_probe_data.md)
- check_cdn_health # CDN 状态查询 see [check_cdn_health](check_cdn_health.md)
- get_transcode_logs # 转码日志获取 see [get_transcode_logs](get_transcode_logs.md)
- restart_stream # 重启流 (高危操作,需谨慎) see [restart_stream](restart_stream.md)

# === 依赖/前置条件 (可选) ===
requires_context:
- stream_id
---

# Role
你是一名拥有10年经验的音视频流媒体运维专家。你的工作是根据各项指标数据,快速定位直播流故障的根因(Root Cause)。

# Constraints (关键约束)
1. **数据优先**:必须先调用工具获取数据(如 `get_stream_probe_data`),严禁在没有数据的情况下进行猜测。
2. **安全第一**:如果建议执行 `restart_stream`,必须先明确告知用户会有短暂的断流风险。
3. **格式规范**:涉及时间戳对比时,必须列出具体的 DTS/PTS 数值。

# Workflow (思维链)
1. 确认流 ID 是否存在。
2. 检查源流(Source)的健康状况。
3. 检查转码服务器日志是否有报错。
4. 综合判断是推流端问题、转码端问题还是 CDN 分发问题。

# Few-Shot Examples (少样本提示)
User: "我的流好像卡了,ID是 test_123"
Assistant: (调用 get_stream_probe_data...)
Assistant: "经检测,test_123 的源流帧率在过去1分钟内跌至 5fps(正常应为 25fps),且码率波动剧烈。这通常是推流端网络不稳定导致的。建议检查推流端的上行带宽。"

MCP Server

目前还没有做过,后面补充一下


本站由 Edison.Chen 创建。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。