马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
×
一个 Agent 干不完怎么办?
Java实今世码
- public class AgentTeamsSystem {
- // --- 配置 ---
- private static final Path WORKDIR = Paths.get(System.getProperty("user.dir"));
- private static final Path TEAM_DIR = WORKDIR.resolve(".team");
- private static final Path INBOX_DIR = TEAM_DIR.resolve("inbox");
- private static final Gson gson = new GsonBuilder().setPrettyPrinting().create();
-
- // 有效消息类型
- private static final Set<String> VALID_MSG_TYPES = Set.of(
- "message", "broadcast", "shutdown_request",
- "shutdown_response", "plan_approval_response"
- );
-
- // --- 消息系统(MessageBus)---
- static class MessageBus {
- private final Path inboxDir;
-
- public MessageBus(Path inboxDir) {
- this.inboxDir = inboxDir;
- try {
- Files.createDirectories(inboxDir);
- } catch (IOException e) {
- throw new RuntimeException("Failed to create inbox directory", e);
- }
- }
-
- /**
- * 发送消息到指定智能体
- */
- public String send(String sender, String to, String content,
- String msgType, Map<String, Object> extra) {
- if (!VALID_MSG_TYPES.contains(msgType)) {
- return String.format("Error: Invalid type '%s'. Valid: %s",
- msgType, String.join(", ", VALID_MSG_TYPES));
- }
-
- Map<String, Object> message = new LinkedHashMap<>();
- message.put("type", msgType);
- message.put("from", sender);
- message.put("content", content);
- message.put("timestamp", System.currentTimeMillis() / 1000.0);
-
- if (extra != null) {
- message.putAll(extra);
- }
-
- Path inboxPath = inboxDir.resolve(to + ".jsonl");
- try {
- String jsonLine = gson.toJson(message) + "\n";
- Files.writeString(inboxPath, jsonLine,
- StandardOpenOption.CREATE, StandardOpenOption.APPEND);
-
- return String.format("Sent %s to %s", msgType, to);
- } catch (IOException e) {
- return "Error: " + e.getMessage();
- }
- }
-
- /**
- * 读取并清空邮箱
- */
- public List<Map<String, Object>> readInbox(String name) {
- Path inboxPath = inboxDir.resolve(name + ".jsonl");
- if (!Files.exists(inboxPath)) {
- return new ArrayList<>();
- }
-
- try {
- List<Map<String, Object>> messages = new ArrayList<>();
- List<String> lines = Files.readAllLines(inboxPath);
-
- for (String line : lines) {
- if (!line.trim().isEmpty()) {
- Type type = new TypeToken<Map<String, Object>>(){}.getType();
- Map<String, Object> message = gson.fromJson(line, type);
- messages.add(message);
- }
- }
-
- // 清空邮箱(消费模式)
- Files.writeString(inboxPath, "");
-
- return messages;
- } catch (IOException e) {
- return new ArrayList<>();
- }
- }
-
- /**
- * 广播消息到所有队友
- */
- public String broadcast(String sender, String content, List<String> teammates) {
- int count = 0;
- for (String name : teammates) {
- if (!name.equals(sender)) {
- send(sender, name, content, "broadcast");
- count++;
- }
- }
- return String.format("Broadcast to %d teammates", count);
- }
- }
-
- // 初始化消息总线
- private static final MessageBus BUS = new MessageBus(INBOX_DIR);
-
- // --- 智能体管理器(TeammateManager)---
- static class TeammateManager {
- private final Path teamDir;
- private final Path configPath;
- private Map<String, Object> config;
- private final Map<String, Thread> threads = new ConcurrentHashMap<>();
- private final Map<String, AtomicBoolean> stopFlags = new ConcurrentHashMap<>();
-
- public TeammateManager(Path teamDir) {
- this.teamDir = teamDir;
- this.configPath = teamDir.resolve("config.json");
- loadConfig();
- }
-
- @SuppressWarnings("unchecked")
- private void loadConfig() {
- try {
- if (Files.exists(configPath)) {
- String content = Files.readString(configPath);
- Type type = new TypeToken<Map<String, Object>>(){}.getType();
- this.config = gson.fromJson(content, type);
- } else {
- this.config = new HashMap<>();
- config.put("team_name", "default");
- config.put("members", new ArrayList<Map<String, Object>>());
- saveConfig();
- }
- } catch (IOException e) {
- throw new RuntimeException("Failed to load team config", e);
- }
- }
-
- @SuppressWarnings("unchecked")
- public String spawn(String name, String role, String prompt) {
- Map<String, Object> member = findMember(name);
-
- if (member != null) {
- String status = (String) member.get("status");
- if (!"idle".equals(status) && !"shutdown".equals(status)) {
- return String.format("Error: '%s' is currently %s", name, status);
- }
- member.put("status", "working");
- member.put("role", role);
- } else {
- member = new LinkedHashMap<>();
- member.put("name", name);
- member.put("role", role);
- member.put("status", "working");
- ((List<Map<String, Object>>) config.get("members")).add(member);
- }
-
- saveConfig();
-
- // 停止之前的线程(如果存在)
- if (threads.containsKey(name)) {
- stopFlags.get(name).set(true);
- try {
- threads.get(name).join(1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- // 创建新的停止标志
- AtomicBoolean stopFlag = new AtomicBoolean(false);
- stopFlags.put(name, stopFlag);
-
- // 创建并启动新线程
- Thread thread = new Thread(() -> teammateLoop(name, role, prompt, stopFlag),
- "Teammate-" + name);
- thread.setDaemon(true);
- threads.put(name, thread);
- thread.start();
-
- return String.format("Spawned '%s' (role: %s)", name, role);
- }
-
- private void teammateLoop(String name, String role, String prompt, AtomicBoolean stopFlag) {
- String systemPrompt = String.format(
- "You are '%s', role: %s, at %s. " +
- "Use send_message to communicate. Complete your task.",
- name, role, WORKDIR
- );
-
- List<Map<String, Object>> messages = new ArrayList<>();
- messages.add(Map.of("role", "user", "content", prompt));
-
- // 最大迭代次数限制
- for (int i = 0; i < 50 && !stopFlag.get(); i++) {
- try {
- // 检查邮箱
- List<Map<String, Object>> inbox = BUS.readInbox(name);
- for (Map<String, Object> msg : inbox) {
- messages.add(Map.of("role", "user", "content", gson.toJson(msg)));
- }
-
- // 模拟调用 LLM
- Map<String, Object> response = simulateTeammateLLMCall(systemPrompt, messages, name);
-
- if (response == null || "end_turn".equals(response.get("stop_reason"))) {
- break;
- }
-
- // ... 执行工具调用
-
- // 短暂休眠,避免 CPU 过度使用
- Thread.sleep(100);
-
- } catch (Exception e) {
- System.err.printf("[%s] Error: %s%n", name, e.getMessage());
- break;
- }
- }
-
- // 更新状态
- @SuppressWarnings("unchecked")
- Map<String, Object> member = findMember(name);
- if (member != null && !"shutdown".equals(member.get("status"))) {
- member.put("status", "idle");
- saveConfig();
- }
-
- threads.remove(name);
- stopFlags.remove(name);
- }
-
- @SuppressWarnings("unchecked")
- public String listAll() {
- List<Map<String, Object>> members = (List<Map<String, Object>>) config.get("members");
- if (members.isEmpty()) {
- return "No teammates.";
- }
-
- StringBuilder sb = new StringBuilder();
- sb.append("Team: ").append(config.get("team_name")).append("\n");
-
- for (Map<String, Object> member : members) {
- sb.append(String.format(" %s (%s): %s%n",
- member.get("name"),
- member.get("role"),
- member.get("status")
- ));
- }
-
- return sb.toString().trim();
- }
-
- @SuppressWarnings("unchecked")
- public List<String> memberNames() {
- List<Map<String, Object>> members = (List<Map<String, Object>>) config.get("members");
- return members.stream()
- .map(m -> (String) m.get("name"))
- .collect(Collectors.toList());
- }
-
- /**
- * 获取活动成员数量
- */
- @SuppressWarnings("unchecked")
- public int getActiveCount() {
- List<Map<String, Object>> members = (List<Map<String, Object>>) config.get("members");
- int count = 0;
- for (Map<String, Object> member : members) {
- if ("working".equals(member.get("status"))) {
- count++;
- }
- }
- return count;
- }
- }
-
- // 初始化智能体管理器
- private static final TeammateManager TEAM_MANAGER = new TeammateManager(TEAM_DIR);
-
- // --- 工具枚举 ---
- public enum ToolType {
- BASH("bash", "Run a shell command."),
- READ_FILE("read_file", "Read file contents."),
- WRITE_FILE("write_file", "Write content to file."),
- EDIT_FILE("edit_file", "Replace exact text in file."),
- SPAWN_TEAMMATE("spawn_teammate", "Spawn a persistent teammate that runs in its own thread."), // 新增
- LIST_TEAMMATES("list_teammates", "List all teammates with name, role, status."), // 新增
- SEND_MESSAGE("send_message", "Send a message to a teammate's inbox."), // 新增
- READ_INBOX("read_inbox", "Read and drain the lead's inbox."), // 新增
- BROADCAST("broadcast", "Send a message to all teammates."); // 新增
- public final String name;
- public final String description;
- ToolType(String name, String description) { this.name = name; this.description = description; }
- }
- // --- 工具处理器映射 ---
- private static final Map<String, ToolExecutor> TOOL_HANDLERS = new HashMap<>();
-
- static {
- // ... 省略基础工具注册
-
- // 团队管理工具
- TOOL_HANDLERS.put(ToolType.SPAWN_TEAMMATE.name, args -> {
- String name = (String) args.get("name");
- String role = (String) args.get("role");
- String prompt = (String) args.get("prompt");
- return TEAM_MANAGER.spawn(name, role, prompt);
- });
-
- TOOL_HANDLERS.put(ToolType.LIST_TEAMMATES.name, args -> {
- return TEAM_MANAGER.listAll();
- });
-
- TOOL_HANDLERS.put(ToolType.SEND_MESSAGE.name, args -> {
- String to = (String) args.get("to");
- String content = (String) args.get("content");
- String msgType = (String) args.get("msg_type");
- if (msgType == null) msgType = "message";
- return BUS.send("lead", to, content, msgType);
- });
-
- TOOL_HANDLERS.put(ToolType.READ_INBOX.name, args -> {
- List<Map<String, Object>> inbox = BUS.readInbox("lead");
- return gson.toJson(inbox);
- });
-
- TOOL_HANDLERS.put(ToolType.BROADCAST.name, args -> {
- String content = (String) args.get("content");
- return BUS.broadcast("lead", content, TEAM_MANAGER.memberNames());
- });
- }
-
- // --- Agent 主循环(领导智能体)---
- public static void agentLoop(List<Map<String, Object>> messages) {
- while (true) {
- try {
- // 检查领导邮箱
- List<Map<String, Object>> inbox = BUS.readInbox("lead");
- if (!inbox.isEmpty()) {
- String inboxJson = gson.toJson(inbox);
- messages.add(Map.of(
- "role", "user",
- "content", "<inbox>" + inboxJson + "</inbox>"
- ));
-
- messages.add(Map.of(
- "role", "assistant",
- "content", "Noted inbox messages."
- ));
- // 邮箱自动注入:自动检查并注入收到的消息
- // 结构化格式:用XML标签包裹,便于LLM解析
- }
-
- // 显示团队状态
- int activeCount = TEAM_MANAGER.getActiveCount();
- if (activeCount > 0) {
- System.out.printf("[Active teammates: %d]%n", activeCount);
- }
-
- // ... 省略相同的 LLM 调用和工具执行逻辑
-
- } catch (Exception e) {
- System.err.println("Error in agent loop: " + e.getMessage());
- e.printStackTrace();
- return;
- }
- }
- }
- }
复制代码 这段代码引入了智能体团队体系,实现了多个 Agent 之间的协作
焦点头脑:恒久队友 + 异步邮箱。
什么是恒久队友(Persistent Teammates)?它们是恒久存活、有身份意识的 Agent,通过基于文件的邮箱(JSONL 格式)异步通讯,可以大概处置惩罚超过单个实验周期的复杂使命委托。
多智能体体系架构
焦点头脑:从单智能体体系升级为多智能体协作体系,引入分布式、脚色化、可通讯的智能体团队,实现复杂的协同工作流和分布式题目办理。- // 系统配置
- private static final Path TEAM_DIR = WORKDIR.resolve(".team");
- private static final Path INBOX_DIR = TEAM_DIR.resolve("inbox");
- // 团队持久化:.team目录存储团队配置
- // 消息传递:inbox子目录实现智能体间通信
- // 文件系统基础:通过文件系统实现简单的分布式通信
复制代码
- 多智能体协同:多个智能体可以并行工作,协同办理题目
- 脚色化分工:差异智能体担当差异脚色,专业化分工
- 恒久化团队:团队设置和状态可以恒久化生存
- 去中央化通讯:基于文件体系的轻量级消息通报
消息总线体系(MessageBus)
- // 消息总线 - 智能体间通信基础设施
- static class MessageBus {
- private final Path inboxDir;
- // 文件系统邮箱:每个智能体一个jsonl文件
- // 异步通信:发送方不阻塞,接收方主动拉取
- // 松耦合:智能体间通过邮箱解耦
-
- /**
- * 发送消息到指定智能体
- */
- public String send(String sender, String to, String content,
- String msgType, Map<String, Object> extra) {
- if (!VALID_MSG_TYPES.contains(msgType)) {
- return String.format("Error: Invalid type '%s'. Valid: %s",
- msgType, String.join(", ", VALID_MSG_TYPES));
- }
- // 消息类型验证:确保消息结构符合协议
-
- Map<String, Object> message = new LinkedHashMap<>();
- message.put("type", msgType);
- message.put("from", sender);
- message.put("content", content);
- message.put("timestamp", System.currentTimeMillis() / 1000.0);
- // 结构化消息:类型、发送方、内容、时间戳
- // 可扩展:支持额外字段
-
- Path inboxPath = inboxDir.resolve(to + ".jsonl");
- try {
- String jsonLine = gson.toJson(message) + "\n";
- Files.writeString(inboxPath, jsonLine,
- StandardOpenOption.CREATE, StandardOpenOption.APPEND);
- // 追加写入:支持多条消息
- // JSONL格式:每行一个JSON对象,便于处理
- }
- }
-
- /**
- * 读取并清空邮箱
- */
- public List<Map<String, Object>> readInbox(String name) {
- Path inboxPath = inboxDir.resolve(name + ".jsonl");
- if (!Files.exists(inboxPath)) {
- return new ArrayList<>();
- }
-
- try {
- List<Map<String, Object>> messages = new ArrayList<>();
- List<String> lines = Files.readAllLines(inboxPath);
-
- for (String line : lines) {
- if (!line.trim().isEmpty()) {
- Type type = new TypeToken<Map<String, Object>>(){}.getType();
- Map<String, Object> message = gson.fromJson(line, type);
- messages.add(message);
- }
- }
-
- // 清空邮箱(消费模式)
- Files.writeString(inboxPath, "");
- // 消费一次:消息被读取后清空,避免重复处理
- // 确保每个消息只被处理一次
-
- return messages;
- }
- }
- }
复制代码
- 异步通讯:发送和吸取解耦,不壅闭发送方
- 文件体系存储:简朴可靠,支持进程间通讯
- 布局化消息:明白的消息格式,支持多种消息范例
- 斲丧模式:读取后清空,克制消息重复处置惩罚
- 可扩展协议:通过msgType支持差异的通讯语义
智能体管理器(TeammateManager)
- // 智能体管理器 - 多智能体生命周期管理
- static class TeammateManager {
- private final Path teamDir;
- private final Path configPath;
- private Map<String, Object> config;
- private final Map<String, Thread> threads = new ConcurrentHashMap<>();
- private final Map<String, AtomicBoolean> stopFlags = new ConcurrentHashMap<>();
- // 配置管理:团队配置持久化到文件
- // 线程管理:每个智能体在自己的线程中运行
- // 停止控制:支持优雅停止智能体
-
- public String spawn(String name, String role, String prompt) {
- Map<String, Object> member = findMember(name);
-
- if (member != null) {
- String status = (String) member.get("status");
- if (!"idle".equals(status) && !"shutdown".equals(status)) {
- return String.format("Error: '%s' is currently %s", name, status);
- }
- member.put("status", "working");
- member.put("role", role);
- } else {
- member = new LinkedHashMap<>();
- member.put("name", name);
- member.put("role", role);
- member.put("status", "working");
- ((List<Map<String, Object>>) config.get("members")).add(member);
- }
- // 状态管理:智能体有明确的状态机
- // 重用支持:可以重用已有的智能体
- // 角色配置:为智能体分配特定角色
-
- saveConfig();
-
- // 创建新的停止标志
- AtomicBoolean stopFlag = new AtomicBoolean(false);
- stopFlags.put(name, stopFlag);
-
- // 创建并启动新线程
- Thread thread = new Thread(() -> teammateLoop(name, role, prompt, stopFlag),
- "Teammate-" + name);
- thread.setDaemon(true);
- threads.put(name, thread);
- thread.start();
- // 独立线程:每个智能体在独立线程中运行
- // 守护线程:不会阻止JVM退出
- // 命名线程:便于调试和监控
 -
- return String.format("Spawned '%s' (role: %s)", name, role);
- }
-
- private void teammateLoop(String name, String role, String prompt, AtomicBoolean stopFlag) {
- String systemPrompt = String.format(
- "You are '%s', role: %s, at %s. " +
- "Use send_message to communicate. Complete your task.",
- name, role, WORKDIR
- );
- // 个性化系统提示:为每个智能体定制角色
- // 明确角色:让智能体知道自己的身份和职责
-
- List<Map<String, Object>> messages = new ArrayList<>();
- messages.add(Map.of("role", "user", "content", prompt));
- // 初始化消息:从传入的prompt开始
-
- // 最大迭代次数限制
- for (int i = 0; i < 50 && !stopFlag.get(); i++) {
- try {
- // 检查邮箱
- List<Map<String, Object>> inbox = BUS.readInbox(name);
- for (Map<String, Object> msg : inbox) {
- messages.add(Map.of("role", "user", "content", gson.toJson(msg)));
- }
- // 邮箱检查:每次迭代前检查新消息
- // 消息注入:将收到的消息加入上下文
- // 持续通信:支持动态的任务调整
-
- // 短暂休眠,避免 CPU 过度使用
- Thread.sleep(100);
- // 节能设计:避免忙等待
- }
- }
-
- // 更新状态
- Map<String, Object> member = findMember(name);
- if (member != null && !"shutdown".equals(member.get("status"))) {
- member.put("status", "idle");
- saveConfig();
- }
- // 状态恢复:完成后状态恢复为idle
- // 配置持久化:状态变化立即保存
- }
- }
复制代码
- 生命周期管理:智能体的创建、运行、克制、烧毁
- 状态恒久化:智能体状态生存到文件,重启可规复
- 独立实验:每个智能体在自己的线程中独立运行
- 通讯集成:主动查抄邮箱,支持动态通讯
- 优雅克制:支持安全的克制机制
多智能体通讯工具集
- // 团队管理工具集
- public enum ToolType {
- SPAWN_TEAMMATE("spawn_teammate", "Spawn a persistent teammate that runs in its own thread."),
- LIST_TEAMMATES("list_teammates", "List all teammates with name, role, status."),
- SEND_MESSAGE("send_message", "Send a message to a teammate's inbox."),
- READ_INBOX("read_inbox", "Read and drain the lead's inbox."),
- BROADCAST("broadcast", "Send a message to all teammates.");
- // 团队创建:动态生成新的智能体
- // 状态查询:查看所有智能体状态
- // 点对点通信:向特定智能体发送消息
- // 广播通信:向所有智能体发送消息
- // 邮箱读取:获取收到的消息
- }
- // 工具处理器
- TOOL_HANDLERS.put(ToolType.SEND_MESSAGE.name, args -> {
- String to = (String) args.get("to");
- String content = (String) args.get("content");
- String msgType = (String) args.get("msg_type");
- if (msgType == null) msgType = "message";
- return BUS.send("lead", to, content, msgType);
- // 领导身份:所有消息都以"lead"身份发送
- // 灵活消息类型:支持不同类型的消息
- });
- TOOL_HANDLERS.put(ToolType.BROADCAST.name, args -> {
- String content = (String) args.get("content");
- return BUS.broadcast("lead", content, TEAM_MANAGER.memberNames());
- // 批量发送:向所有团队成员发送消息
- // 排除自己:广播不包含发送者自己
- });
复制代码
- 完备的通讯API:提供完备的智能体间通讯本领
- 向导-成员模式:明白的向导智能体控制整个团队
- 机动的通讯模式:支持点对点、广播、邮箱读取
- 与现有体系集成:与根本工具无缝集成
向导智能体主循环
- // Agent 主循环(领导智能体)
- public static void agentLoop(List<Map<String, Object>> messages) {
- while (true) {
- try {
- // 检查领导邮箱
- List<Map<String, Object>> inbox = BUS.readInbox("lead");
- if (!inbox.isEmpty()) {
- String inboxJson = gson.toJson(inbox);
- messages.add(Map.of(
- "role", "user",
- "content", "<inbox>" + inboxJson + "</inbox>"
- ));
-
- messages.add(Map.of(
- "role", "assistant",
- "content", "Noted inbox messages."
- ));
- // 自动邮箱检查:每次迭代前检查新消息
- // 结构化注入:用XML标签包裹,便于解析
- // 对话完整:添加assistant响应,保持结构
- }
-
- // 显示团队状态
- int activeCount = TEAM_MANAGER.getActiveCount();
- if (activeCount > 0) {
- System.out.printf("[Active teammates: %d]%n", activeCount);
- }
- // 状态监控
:实时显示活跃智能体数量 - }
- }
- }
复制代码
- 主动通讯:向导智能体主动吸取和处置惩罚消息
- 状态感知:实时相识团队状态
- 决议依据:基于团队反馈做出更好的决议
- 向导和谐:向导智能体负责和谐解个团队
架构演进与代价
从 BackgroundTasksSystem 到 AgentTeamsSystem 的升级:
维度BackgroundTasksSystemAgentTeamsSystem架构模式主从异步使命多智能体协作智能水平被动实验使命主动协作办理通讯方式效果关照布局化消息通报脚色分工无明白的脚色化分工决议机制会合决议分布式协同决议
|