希望能够在这篇文章中综合讨论搭建代理的各种方案,这里的代理定义并不局限于能力边界要求,而是从普通的工作流一直到大规模skill/框架搭建出的Agent。一个合理的Agent不应该盲目的增加复杂度,而应该根据任务的场景、ROI等等因素来衡量方案
工作流 对于工作流,市面上有很多成熟的框架例如n8n coze Dify等等,目前我只用过n8n,所做的工作也比较简单。主要原因是我的场景中并没有太多完全流程化的东西,而工作流的方案往往更适合与这些。
它的基本步骤就是首先要提出一个SOP的流程,然后选一个基模,我认为这里尤其应该注意的是,Deepseek是没有多模态能力的,而是通过OCR来实现图片识别,在搭建工作流时应该权衡经济和多模态能力,是否选择Deepseek。而搭建一个工作流的麻烦点就是各个节点的配置,举一个基础的例子就是搭建邮件工作流,如果是谷歌邮箱,其中的Oauth流程还是挺复杂的。有一个SOP流程+节点配置好,实际上一个工作流就不难搭建了。而对于不同的任务,需要在以上提到的框架中多找找,可能某个框架针对这个任务场景设计的节点要更加优雅和安全。
同时大多数工作流工具主要是DAG,虽然支持Loop,但是远远没有代码灵活。
SDK实现 对于RawSDK 搭配上一个BaseURL+API就能够实现一个Agent了,相比于一个重量级的框架(例如Langchain系列),如果搭建一个自由便捷,或者项目刚刚开始i,要开发一个MVP版本,使用SDK都是一个很好的选择,而且OpenAISDK也提供里越来越多的功能。下面是使用SDK+一个Loop循环实现的一个简单的代理。基本思路就是定义tool->定义schema->调用SDK绑定Tool以及生成Response->进入Loop->给出最终答案并退出
一个简单案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 import jsonfrom openai import OpenAIMAXSTEP = 10 def start_stream (stream_id: str , protocol: str ): if protocol not in ['rtmp' , 'hls' ]: return json.dumps({"error" : "Unsupported protocol" }) else : return json.dumps({"stream_id" : stream_id, "protocol" : protocol, "status" : "starting" }) def stop_stream (stream_id: str ): return json.dumps({"msg" : f"stream_{stream_id} has been stopped." }) def get_stream_status (stream_id: str ): if stream_id == "news_channel" : return json.dumps({"bitrate" : 500 , "status" : "unstable" , "fps" : 10 }) elif stream_id == "sports_channel" : return json.dumps({"bitrate" : 4000 , "status" : "live" , "fps" : 60 }) else : return json.dumps({"error" : "offline" }) tools_schema = [ { "type" : "function" , "function" : { "name" : "start_stream" , "description" : "Start a stream with given stream_id and protocol." , "parameters" : { "type" : "object" , "properties" : { "stream_id" : {"type" : "string" }, "protocol" : {"type" : "string" , "enum" : ["rtmp" , "hls" ]} }, "required" : ["stream_id" , "protocol" ] } } }, { "type" : "function" , "function" : { "name" : "stop_stream" , "description" : "Stop a running stream." , "parameters" : { "type" : "object" , "properties" : { "stream_id" : {"type" : "string" } }, "required" : ["stream_id" ] } } }, { "type" : "function" , "function" : { "name" : "get_stream_status" , "description" : "Get technical status (bitrate, fps) of a stream." , "parameters" : { "type" : "object" , "properties" : { "stream_id" : {"type" : "string" } }, "required" : ["stream_id" ] } } } ] available_tools = { "start_stream" : start_stream, "stop_stream" : stop_stream, "get_stream_status" : get_stream_status } class StreamOpsAgent : def __init__ (self ): self.client = OpenAI( api_key= base_url= ) self.max_steps = MAXSTEP def run (self, user_input ): messages = [{"role" : "user" , "content" : user_input}] print (f"UserInput: {user_input} " ) step_count = 0 while step_count < self.max_steps: step_count += 1 response = self.client.chat.completions.create( model="gpt-4o-mini" , messages=messages, tools=tools_schema, tool_choice="auto" ) response_message = response.choices[0 ].message tool_calls = response_message.tool_calls if tool_calls: messages.append(response_message) for tool_call in tool_calls: function_name = tool_call.function.name function_args = json.loads(tool_call.function.arguments) tool_call_id = tool_call.id print (f" -> [Tool Call] {function_name} Args: {function_args} " ) if function_name in available_tools: tool_function = available_tools[function_name] try : tool_output = tool_function(**function_args) except Exception as e: tool_output = json.dumps({"error" : str (e)}) else : tool_output = json.dumps({"error" : "Tool not found" }) if not isinstance (tool_output, str ): tool_output = json.dumps(tool_output) messages.append({ "role" : "tool" , "tool_call_id" : tool_call_id, "name" : function_name, "content" : tool_output }) else : print ("Final Response Generated." ) return response_message.content return "Error: Maximum steps exceeded." if __name__ == "__main__" : agent = StreamOpsAgent() print ("\n--- Test Case 1 ---" ) user_query1 = "帮我用 RTMP 协议启动一个叫 test_live 的流。" result = agent.run(user_query1) print ("Agent Result:" , result) print ("\n--- Test Case 2 (Complex) ---" ) user_query2 = "检查一下 news_channel 的状态,如果它不稳定(unstable)或者码率低于 1000,就把它重启一下." result2 = agent.run(user_query2) print ("Agent Result:" , result2)
需要注意的是对于SDK调用有三个阶段,
首先是很原生的使用Prompt+Regex:在 Prompt 里写死:“如果是要查询天气,请回复 Action: get_weather”,然后通过正则解析得到Tool调用->极不稳定
Functions API:使用Function+Function_call来对已经微调过的模型进行调用,尽管模型会针对函数专门生成json便于解析,但是这种方式不支持并行,其接口是在response中定义
functions=tools, function_call=”auto” 这种方式也已经被废弃。
最后就是使用tools和tool_choice的接口:可以实现并行调用,模型会返回一个tool_call数组,可以对调用进行并行处理。上面代码中使用的就是Tool接口
SDK的其他API 结构化输出 OpenAI的SDK提供了beta.chat.completions.parse 接口,配合 Pydantic 可以实现零解析错误的强类型输出。举例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from pydantic import BaseModelfrom enum import Enumclass Severity (str , Enum): INFO = "info" WARNING = "warning" CRITICAL = "critical" class LogDiagnosis (BaseModel ): error_code: str severity: Severity suggested_command: str reasoning: str ffmpeg_log = "Error: Connection timed out. RTMP server not reachable at rtmp://192.168.1.5/live" completion = client.beta.chat.completions.parse( model="gpt-4o" , messages=[ {"role" : "system" , "content" : "你是音视频专家,请分析日志并给出结构化诊断。" }, {"role" : "user" , "content" : ffmpeg_log}, ], response_format=LogDiagnosis, ) diagnosis = completion.choices[0 ].message.parsed print (f"Severity: {diagnosis.severity} " ) print (f"Fix: {diagnosis.suggested_command} " )
流式响应 为了优化体验,首字响应时间很重要,而决定这个因素一方面是API调用延迟,另一方面是Agent处理的延时,这里主要考虑使用SDK来处理agent的延时。SDK提供了stream = true 参数,在处理工具调用时需要进行拼接chunck,但是回复生成阶段带来的体验提升是很大的。
1 2 3 4 5 6 7 8 9 10 11 response_stream = client.chat.completions.create( model="gpt-4o-mini" , messages=[{"role" : "user" , "content" : "请解释一下 HLS 和 RTMP 协议的区别" }], stream=True , ) print (Agent: , end="" , flush=True )for chunk in response_stream: if chunk.choices[0 ].delta.content: content = chunk.choices[0 ].delta.content print (content, end="" , flush=True )
多模态能力 可以通过添加image_url参数来配置图片接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 response = client.chat.completions.create( model="gpt-4o" , messages=[ { "role" : "user" , "content" : [ {"type" : "text" , "text" : "这张是推流端的截图,画面正常吗?" }, { "type" : "image_url" , "image_url" : { "url" : "[https://example.com/stream_snapshot.jpg](https://example.com/stream_snapshot.jpg)" , "detail" : "low" } } ], } ], ) print (response.choices[0 ].message.content)
LangChain/LangGraph 当然了框架肯定不仅限于LAngchain团队开发的,还有AutoGen、CrewAI、smolagents、OpenAI Swarm 和 OpenManus等等。只是LAngChain的接口更加通用,并且目前我的应用场景还没有溢出LangChain的能力。但还是综合网上文章讨论一下几种框架的不同,传统LangChain使用ReAct,LangGraph使用图,具体在后面说。AutoGen似乎对subAgent的支持更好,因为提供了一个异步消息机制,这更有利于多个agent的通信。OpenManus 并未采用端到端的模型微调方案,而是强调 上下文驱动(In-Context Learning) 的 Agent 架构设计。其核心思想是充分利用先进大模型已有的推理与泛化能力,通过提示词工程、上下文组织和多轮交互,让 Agent 快速适配新任务
相较于Raw SDK, LangChain框架统一了接口,提供了持久化存储,包括checkpoint和threadID接口,能够实现历史回溯。但是Langchain的封装太高。例如使用creatAgent一个接口完成了整个agent创建,因此这里解释更加自由的Langgraph。SDK 中的 while 循环虽然简单,但难以持久化。LangGraph 将流程定义为 Nodes(节点/动作) 和 Edges(边/逻辑跳转)。具体可以看一下文档 ,Langchaint团队还开发了很多Agent的接口特性。下面聊一些目前我用到的,例如Human in the loop等。
下面是一段使用langGraph改造的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 import jsonimport operatorfrom typing import Annotated, TypedDict, Union , List from langchain_openai import ChatOpenAIfrom langchain_core.tools import toolfrom langchain_core.messages import BaseMessage, HumanMessage, SystemMessagefrom langgraph.graph import StateGraph, END, STARTfrom langgraph.prebuilt import ToolNodefrom langgraph.checkpoint.memory import MemorySaver@tool def start_stream (stream_id: str , protocol: str ): """Start a stream with given stream_id and protocol (rtmp/hls only).""" if protocol not in ['rtmp' , 'hls' ]: return json.dumps({"error" : "Unsupported protocol" }) return json.dumps({"stream_id" : stream_id, "protocol" : protocol, "status" : "starting" }) @tool def stop_stream (stream_id: str ): """Stop a running stream.""" return json.dumps({"msg" : f"stream_{stream_id} has been stopped." }) @tool def get_stream_status (stream_id: str ): """Get technical status (bitrate, fps) of a stream.""" if stream_id == "news_channel" : return json.dumps({"bitrate" : 500 , "status" : "unstable" , "fps" : 10 }) elif stream_id == "sports_channel" : return json.dumps({"bitrate" : 4000 , "status" : "live" , "fps" : 60 }) else : return json.dumps({"error" : "offline" }) tools = [start_stream, stop_stream, get_stream_status] class AgentState (TypedDict ): messages: Annotated[List [BaseMessage], operator.add] llm = ChatOpenAI( model="gpt-4o-mini" , api_key=, base_url= ) llm_with_tools = llm.bind_tools(tools) def agent_node (state: AgentState ): """思考节点:调用 LLM 决定下一步""" messages = state["messages" ] response = llm_with_tools.invoke(messages) return {"messages" : [response]} tool_node = ToolNode(tools) workflow = StateGraph(AgentState) workflow.add_node("agent" , agent_node) workflow.add_node("tools" , tool_node) workflow.add_edge(START, "agent" ) def should_continue (state: AgentState ): last_message = state["messages" ][-1 ] if last_message.tool_calls: return "tools" return END workflow.add_conditional_edges( "agent" , should_continue, ["tools" , END] ) workflow.add_edge("tools" , "agent" ) checkpointer = MemorySaver() app = workflow.compile (checkpointer=checkpointer) if __name__ == "__main__" : config = {"configurable" : {"thread_id" : "user_session_001" }} print ("--- 任务:复杂流控推理 ---" ) user_input = "检查一下 news_channel 的状态,如果它不稳定或者码率低于 1000,就把它重启一下。" for event in app.stream( {"messages" : [HumanMessage(content=user_input)]}, config=config ): for key, value in event.items(): print (f"\n[Node: {key} ]" ) last_msg = value["messages" ][-1 ] if hasattr (last_msg, "tool_calls" ) and last_msg.tool_calls: print (f" -> 计划调用: {last_msg.tool_calls[0 ]['name' ]} " ) elif hasattr (last_msg, "content" ): print (f" -> 内容: {last_msg.content} " ) print ("\n\n====== 最终结果 ======" ) final_state = app.get_state(config) print (final_state.values["messages" ][-1 ].content)
一些API human in the loop 通过 interrupt_before 或 interrupt_after 来暂停图的执行,等待人工确认或修改状态。实例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 app = workflow.compile ( checkpointer=checkpointer, interrupt_before=["tools" ] ) if __name__ == "__main__" : config = {"configurable" : {"thread_id" : "user_session_hitl_01" }} print ("--- 阶段1:触发任务 ---" ) user_input = "news_channel 好像卡了,帮我重启一下。" for event in app.stream({"messages" : [HumanMessage(content=user_input)]}, config=config): for key, value in event.items(): print (f"[Node: {key} ] 正在处理..." ) snapshot = app.get_state(config) next_step = snapshot.next if "tools" in next_step: last_msg = snapshot.values["messages" ][-1 ] tool_call = last_msg.tool_calls[0 ] print (f"\n[系统暂停] Agent 申请执行: {tool_call['name' ]} 参数: {tool_call['args' ]} " ) approval = input (">>> 是否批准执行?(y/n): " ) if approval.lower() == 'y' : print ("--- 阶段2:批准并继续 ---" ) for event in app.stream(None , config=config): for key, value in event.items(): print (f"[Node: {key} ] {value} " ) else : print ("--- 阶段2:拒绝并告知 Agent ---" ) app.update_state( config, {"messages" : [HumanMessage(content="管理员拒绝了重启操作,请检查是否有其他非破坏性方案。" )]} ) for event in app.stream(None , config=config): print (f"[Node: {key} ] {value} " )
checkpoint与threadID 在Langgraph中通过快照策略将每个操作保存为一个checkpoint,将每一个会话保存为一个threadID,可以通过数据库以及序列化保存这个快照,并设计逻辑实现查找历史会话或者回溯历史checkpoint。而当返回某个checkpoint进行新的操作时,会被分支到一个新的checkpoints,而不是直接覆盖。关于threadID和checkpoint的过滤、排序等策略,都可以在数据库中实现。
subagent 在 LangGraph 中实现层级 Agent 的思路,可以定义另一个 StateGraph (比如 stream_monitor_graph),编译成 monitor_app,然后把它作为一个 Node 放入你的主图 workflow 中。
上下文压缩 虽然 operator.add 导致了状态的无限增长,但 LangGraph 并非束手无策。对于简单的场景,我们在 Node 内部使用 trim_messages 进行基于 Token 的动态截断;而对于需要长期记忆的场景,我们可以设计一个专门的 SummarizeNode,利用 LangGraph 特有的 RemoveMessage 接口,定期对 State 进行上下文压缩。 一个简单的sumarizeNode设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from langchain_core.messages import RemoveMessage, SystemMessage, HumanMessagedef summarize_node (state: AgentState ): summary_prompt = "请总结以上对话的关键信息..." messages = state["messages" ] summary = llm.invoke(messages + [HumanMessage(content=summary_prompt)]) messages_to_delete = [m.id for m in messages[:-2 ]] delete_operations = [RemoveMessage(id =mid) for mid in messages_to_delete] new_summary_msg = SystemMessage(content=f"Background Summary: {summary.content} " ) return {"messages" : [new_summary_msg] + delete_operations} def should_summarize (state: AgentState ): if len (state["messages" ]) > 10 : return "summarize" return END
或者使用官方的trim工具
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from langchain_core.messages import trim_messagestrimmer = trim_messages( max_tokens=2000 , strategy="last" , token_counter=llm, include_system=True , allow_partial=False , start_on="human" ) def agent_node (state: AgentState ): trimmed_messages = trimmer.invoke(state["messages" ]) response = llm_with_tools.invoke(trimmed_messages) return {"messages" : [response]}
Skill策略 以上都是使用tool_call设计的Agent架构,同样的skill也是基于toolcall,只是不同于ReAct(推理+调用),skill通过分层设计skill.md实现了一种渐进式加载的策略,这可以非常明显的避免上下文溢出,同时skill的集成也很简单,只需要添加skill.md即可实现热重载。关于一个简单的skill.md设计,以及其集成见下面实例。关于skill设计可以看官方博客
加载策略 首先构建一个Langgraph节点用来加载skill集合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 import osimport yamlfrom typing import Annotated, TypedDict, List from langchain_openai import ChatOpenAIfrom langgraph.graph import StateGraph, END, STARTfrom langchain_core.messages import BaseMessage, SystemMessagefrom langgraph.prebuilt import ToolNodeimport operatorclass SkillRegistry : def __init__ (self, skill_dir="./skills" , tool_map=None ): self.skill_dir = skill_dir self.tool_map = tool_map or {} def get_all_skill_names (self ): """每次调用都重新扫描目录,实现动态发现""" return [f.replace('.md' , '' ) for f in os.listdir(self.skill_dir) if f.endswith('.md' )] def load_skill_config (self, skill_name ): """实时读取文件内容""" path = os.path.join(self.skill_dir, f"{skill_name} .md" ) if not os.path.exists(path): return None with open (path, 'r' , encoding='utf-8' ) as f: content = f.read() parts = content.split('---' , 2 ) meta = yaml.safe_load(parts[1 ]) prompt = parts[2 ].strip() tools = [self.tool_map[t] for t in meta.get('tools' , []) if t in self.tool_map] return {"prompt" : prompt, "tools" : tools} class AgentState (TypedDict ): messages: Annotated[List [BaseMessage], operator.add] current_skill: str registry = SkillRegistry(tool_map=all_tools_map) def router_node (state: AgentState ): """ 路由节点:根据用户意图选择技能。 注意:这里每次都获取最新的技能列表。 """ available_skills = registry.get_all_skill_names() messages = state["messages" ] system_prompt = f"你是路由助手。当前可用技能:{available_skills} 。请根据用户输入返回最合适的技能名称,只返回名称。" router_llm = ChatOpenAI(model="gpt-4o-mini" ) response = router_llm.invoke([SystemMessage(content=system_prompt)] + messages[-1 :]) chosen_skill = response.content.strip() if chosen_skill not in available_skills: chosen_skill = "general_chat" print (f"🔄 Router: 动态路由到 -> {chosen_skill} " ) return {"current_skill" : chosen_skill} def skill_executor_node (state: AgentState ): """ 通用执行节点:根据 current_skill 变成不同的样子 """ skill_name = state["current_skill" ] config = registry.load_skill_config(skill_name) if not config: return {"messages" : [BaseMessage(content="Skill configuration error." )]} llm = ChatOpenAI(model="gpt-4o" ) system_msg = SystemMessage(content=config["prompt" ]) llm_with_tools = llm.bind_tools(config["tools" ]) messages = [system_msg] + state["messages" ] response = llm_with_tools.invoke(messages) return {"messages" : [response]} workflow = StateGraph(AgentState) workflow.add_node("router" , router_node) workflow.add_node("skill_agent" , skill_executor_node) workflow.add_node("tools" , ToolNode(list (all_tools_map.values()))) workflow.add_edge(START, "router" ) workflow.add_edge("router" , "skill_agent" ) def should_continue (state: AgentState ): last_msg = state["messages" ][-1 ] if last_msg.tool_calls: return "tools" return END workflow.add_conditional_edges("skill_agent" , should_continue, ["tools" , END]) workflow.add_edge("tools" , "skill_agent" ) app = workflow.compile ()
skill.md设计 linuxdo上有一篇文章 写的很好,以下是一个案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 --- # === 元数据区 (给系统/Router看) === name: stream_diagnosis_ expert version: 1.0.2 author: ops_team description: >- 专门用于诊断直播流卡顿、音画不同步、黑屏等质量问题。 当用户反馈观看体验不佳、报错或请求技术排查时,激活此技能。 # === 配置区 (给执行引擎看) === model_ config: temperature: 0.1 # 诊断类任务需要低创造性 max_tokens: 2000 # === 工具绑定区 (给 ToolBinder 看) === # 这里填写 Python 函数的字符串名称,加载时映射到真实函数, tools: - get_ stream_probe_ data # ffprobe 封装 see [get_stream_probe_data ](get_stream_probe_data.md ) - check_cdn_ health # CDN 状态查询 see [check_cdn_health ](check_cdn_health.md ) - get_transcode_ logs # 转码日志获取 see [get_transcode_logs ](get_transcode_logs.md ) - restart_stream # 重启流 (高危操作,需谨慎) see [restart_stream ](restart_stream.md ) # === 依赖/前置条件 (可选) === requires_ context: - stream_id --- # Role 你是一名拥有10年经验的音视频流媒体运维专家。你的工作是根据各项指标数据,快速定位直播流故障的根因(Root Cause)。 # Constraints (关键约束) 1. **数据优先** :必须先调用工具获取数据(如 `get_ stream_probe_ data`),严禁在没有数据的情况下进行猜测。2. **安全第一** :如果建议执行 `restart_stream`,必须先明确告知用户会有短暂的断流风险。 3. **格式规范** :涉及时间戳对比时,必须列出具体的 DTS/PTS 数值。 # Workflow (思维链) 1. 确认流 ID 是否存在。 2. 检查源流(Source)的健康状况。 3. 检查转码服务器日志是否有报错。 4. 综合判断是推流端问题、转码端问题还是 CDN 分发问题。 # Few-Shot Examples (少样本提示) User: "我的流好像卡了,ID是 test_ 123"Assistant: (调用 get_stream_ probe_data...) Assistant: "经检测,test_ 123 的源流帧率在过去1分钟内跌至 5fps(正常应为 25fps),且码率波动剧烈。这通常是推流端网络不稳定导致的。建议检查推流端的上行带宽。"
MCP Server 目前还没有做过,后面补充一下