【从0到1构建一个ClaudeAgent】并发-背景任务 [复制链接]
发表于 昨天 11:19 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

×
有些使用很慢,Agent 不醒目等着。比方长时间编译/构建:make, mvn compile, gradle build 或 大数据处理处罚:hadoop, spark-submit 等的一些工作
Java实当代码
  1. public class BackgroundTasksSystem {
  2.     // --- 配置 ---
  3.     private static final Path WORKDIR = Paths.get(System.getProperty("user.dir"));
  4.     private static final Gson gson = new GsonBuilder().setPrettyPrinting().create();
  5.    
  6.     // --- 后台任务管理器 ---
  7.     static class BackgroundManager {
  8.         // 任务存储
  9.         private final Map<String, TaskInfo> tasks = new ConcurrentHashMap<>();
  10.         // 通知队列
  11.         private final Queue<TaskNotification> notificationQueue = new ConcurrentLinkedQueue<>();
  12.         // 任务 ID 生成器
  13.         private final AtomicInteger taskIdCounter = new AtomicInteger(1);
  14.         // 锁
  15.         private final Object lock = new Object();
  16.         
  17.         static class TaskInfo {
  18.             String taskId;
  19.             String status;  // running, completed, timeout, error
  20.             String result;
  21.             String command;
  22.             long startTime;
  23.             Thread thread;  // 关联的执行线程
  24.         }
  25.         
  26.         static class TaskNotification {
  27.             String taskId;
  28.             String status;
  29.             String command;
  30.             String result;
  31.         }
  32.         
  33.         /**
  34.          * 启动后台任务
  35.          * 立即返回任务 ID,不等待命令完成
  36.          */
  37.         public String run(String command) {
  38.             String taskId = "task_" + taskIdCounter.getAndIncrement();
  39.             
  40.             TaskInfo task = new TaskInfo(taskId, command);
  41.             tasks.put(taskId, task);
  42.             
  43.             // 创建并启动后台线程
  44.             Thread thread = new Thread(() -> executeTask(task), "BackgroundTask-" + taskId);
  45.             thread.setDaemon(true);
  46.             task.thread = thread;
  47.             thread.start();  // 立即返回,不阻塞
  48.             
  49.             return String.format("Background task %s started: %s",
  50.                 taskId, command.substring(0, Math.min(command.length(), 80)));
  51.         }
  52.         
  53.         /**
  54.          * 线程目标:执行子进程,捕获输出,推送结果到队列
  55.          */
  56.         private void executeTask(TaskInfo task) {
  57.             String output;
  58.             String status;
  59.             
  60.             try {
  61.                 ProcessBuilder pb = new ProcessBuilder("bash", "-c", task.command);
  62.                 pb.directory(WORKDIR.toFile());
  63.                 pb.redirectErrorStream(true);
  64.                
  65.                 Process process = pb.start();
  66.                 boolean finished = process.waitFor(300, TimeUnit.SECONDS);  // 5分钟超时
  67.                
  68.                 if (!finished) {
  69.                     process.destroy();
  70.                     output = "Error: Timeout (300s)";
  71.                     status = "timeout";
  72.                 } else {
  73.                     output = new String(process.getInputStream().readAllBytes()).trim();
  74.                     status = "completed";
  75.                 }
  76.             } catch (Exception e) {
  77.                 output = "Error: " + e.getMessage();
  78.                 status = "error";
  79.             }
  80.             
  81.             // 更新任务状态
  82.             task.status = status;
  83.             task.result = output.isEmpty() ? "(no output)" :
  84.                           output.substring(0, Math.min(output.length(), 50000));
  85.             
  86.             // 添加通知到队列
  87.             synchronized (lock) {
  88.                 notificationQueue.offer(new TaskNotification(
  89.                     task.taskId,
  90.                     status,
  91.                     task.command.substring(0, Math.min(task.command.length(), 80)),
  92.                     task.result.substring(0, Math.min(task.result.length(), 500))
  93.                 ));
  94.             }
  95.         }
  96.         
  97.         /**
  98.          * 检查任务状态
  99.          * 如果指定 taskId,检查单个任务;否则列出所有任务
  100.          */
  101.         public String check(String taskId) {
  102.             if (taskId != null && !taskId.isEmpty()) {
  103.                 TaskInfo task = tasks.get(taskId);
  104.                 if (task == null) {
  105.                     return "Error: Unknown task " + taskId;
  106.                 }
  107.                 return String.format("[%s] %s\n%s",
  108.                     task.status,
  109.                     task.command.substring(0, Math.min(task.command.length(), 60)),
  110.                     task.result != null ? task.result : "(running)");
  111.             } else {
  112.                 StringBuilder sb = new StringBuilder();
  113.                 for (Map.Entry<String, TaskInfo> entry : tasks.entrySet()) {
  114.                     TaskInfo task = entry.getValue();
  115.                     sb.append(String.format("%s: [%s] %s\n",
  116.                         task.taskId,
  117.                         task.status,
  118.                         task.command.substring(0, Math.min(task.command.length(), 60))));
  119.                 }
  120.                 return sb.length() > 0 ? sb.toString().trim() : "No background tasks.";
  121.             }
  122.         }
  123.         
  124.         /**
  125.          * 清空通知队列并返回所有待处理的通知
  126.          */
  127.         public List<TaskNotification> drainNotifications() {
  128.             synchronized (lock) {
  129.                 List<TaskNotification> notifications = new ArrayList<>();
  130.                 while (!notificationQueue.isEmpty()) {
  131.                     notifications.add(notificationQueue.poll());
  132.                 }
  133.                 return notifications;
  134.             }
  135.         }
  136.         
  137.         /**
  138.          * 获取所有任务
  139.          */
  140.         public Map<String, TaskInfo> getAllTasks() {
  141.             return new HashMap<>(tasks);
  142.         }
  143.     }
  144.    
  145.     // 初始化后台管理器
  146.     private static final BackgroundManager BG_MANAGER = new BackgroundManager();
  147.    
  148.     // --- 工具枚举 ---
  149.     public enum ToolType {
  150.         BASH("bash", "Run a shell command (blocking)."),
  151.         READ_FILE("read_file", "Read file contents."),
  152.         WRITE_FILE("write_file", "Write content to file."),
  153.         EDIT_FILE("edit_file", "Replace exact text in file."),
  154.         BACKGROUND_RUN("background_run", "Run command in background thread. Returns task_id immediately."),  // 新增
  155.         CHECK_BACKGROUND("check_background", "Check background task status. Omit task_id to list all.");  // 新增
  156.         public final String name;
  157.         public final String description;
  158.         ToolType(String name, String description) { this.name = name; this.description = description; }
  159.     }
  160.     // --- 工具处理器映射 ---
  161.     private static final Map<String, ToolExecutor> TOOL_HANDLERS = new HashMap<>();
  162.    
  163.     static {
  164.         // ... 省略基础工具注册
  165.         
  166.         // 后台任务工具
  167.         TOOL_HANDLERS.put(ToolType.BACKGROUND_RUN.name, args -> {
  168.             String command = (String) args.get("command");
  169.             return BG_MANAGER.run(command);
  170.         });
  171.         
  172.         TOOL_HANDLERS.put(ToolType.CHECK_BACKGROUND.name, args -> {
  173.             String taskId = (String) args.get("task_id");
  174.             return BG_MANAGER.check(taskId);
  175.         });
  176.     }
  177.    
  178.     // ... 省略相同的工具实现
  179.    
  180.     // --- Agent 主循环(集成后台任务通知)---
  181.     public static void agentLoop(List<Map<String, Object>> messages) {
  182.         while (true) {
  183.             try {
  184.                 // 在 LLM 调用前检查后台通知
  185.                 List<BackgroundManager.TaskNotification> notifications = BG_MANAGER.drainNotifications();
  186.                
  187.                 if (!notifications.isEmpty() && !messages.isEmpty()) {
  188.                     StringBuilder notifText = new StringBuilder();
  189.                     notifText.append("<background-results>\n");
  190.                     
  191.                     for (BackgroundManager.TaskNotification notif : notifications) {
  192.                         notifText.append(String.format("[bg:%s] %s: %s\n",
  193.                             notif.taskId, notif.status, notif.result));
  194.                     }
  195.                     
  196.                     notifText.append("</background-results>");
  197.                     
  198.                     messages.add(Map.of(
  199.                         "role", "user",
  200.                         "content", notifText.toString()
  201.                     ));
  202.                     
  203.                     messages.add(Map.of(
  204.                         "role", "assistant",
  205.                         "content", "Noted background results."
  206.                     ));
  207.                     // 异步结果注入:将后台任务结果插入到对话中
  208.                     // 结构化格式:用XML标签包裹,便于LLM解析
  209.                 }
  210.                
  211.                 // 显示当前活动任务
  212.                 Map<String, BackgroundManager.TaskInfo> activeTasks = BG_MANAGER.getAllTasks();
  213.                 int runningTasks = (int) activeTasks.values().stream()
  214.                     .filter(t -> "running".equals(t.status))
  215.                     .count();
  216.                
  217.                 if (runningTasks > 0) {
  218.                     System.out.printf("[Active background tasks: %d]\n", runningTasks);
  219.                 }
  220.                
  221.                 // ... 省略相同的 LLM 调用和工具执行逻辑
  222.                
  223.             } catch (Exception e) {
  224.                 System.err.println("Error in agent loop: " + e.getMessage());
  225.                 e.printStackTrace();
  226.                 return;
  227.             }
  228.         }
  229.     }
  230. }
复制代码
这段代码引入了背景任务体系,办理了 Agent 在实验长时间任务时的壅闭题目
关键洞察:Agent 可以在下令实验时继承工作,而不是被壅闭。
异步任务处理处罚架构

核心头脑:从同步壅闭的任务实验升级为异步非壅闭的并发处理处罚,让Agent可以大概同时处理处罚多个耗时任务,实现"并行盘算"本事,大幅提升服从和相应性。
  1. // 后台任务管理器 - 异步执行引擎
  2. static class BackgroundManager {
  3.     // 任务存储
  4.     private final Map<String, TaskInfo> tasks = new ConcurrentHashMap<>();
  5.     // 通知队列
  6.     private final Queue<TaskNotification> notificationQueue = new ConcurrentLinkedQueue<>();
  7.     // 任务 ID 生成器
  8.     private final AtomicInteger taskIdCounter = new AtomicInteger(1);
  9.     // 并发安全:使用线程安全集合
  10.     // 异步通信:通过队列传递任务结果
  11.     // 唯一标识:自动生成任务ID
  12. }
复制代码

  • 解耦实验:任务提交和实验分离,立即返回控制权
  • 并发管理:多个背景任务可以同时运行
  • 结果异步网络:通过队列机制网络完成的任务结果
  • 线程安全:使用并发聚集确保多线程安全
任务信息结构计划
  1. // 任务信息实体
  2. static class TaskInfo {
  3.     String taskId;        // 唯一标识
  4.     String status;        // 状态:running, completed, timeout, error
  5.     String result;        // 执行结果
  6.     String command;       // 执行的命令
  7.     long startTime;       // 开始时间
  8.     Thread thread;        // 关联的执行线程
  9.     // 完整状态跟踪:从启动到完成的全生命周期
  10.     // 线程关联:可以控制或监控监控执行线程
  11.     // 时间戳:支持超时和性能分析
  12. }
  13. // 任务通知实体
  14. static class TaskNotification {
  15.     String taskId;
  16.     String status;
  17.     String command;
  18.     String result;
  19.     // 轻量传输:只包含必要信息
  20.     // 结构化:易于解析和处理
  21.     // 结果截断:避免过大的通知消息
  22. }
复制代码

  • 状态驱动:明确的任务状态生命周期
  • 结果长期:任务结果可以多次查询
  • 线程管理:可以跟踪和控制实验线程
  • 变乱驱动:通过关照机制通报完成变乱
异步任务启动机制
  1. /**
  2. * 启动后台任务
  3. * 立即返回任务 ID,不等待命令完成
  4. */
  5. public String run(String command) {
  6.     String taskId = "task_" + taskIdCounter.getAndIncrement();
  7.    
  8.     TaskInfo task = new TaskInfo(taskId, command);
  9.     tasks.put(taskId, task);
  10.    
  11.     // 创建并启动后台线程
  12.     Thread thread = new Thread(() -> executeTask(task), "BackgroundTask-" + taskId);
  13.     thread.setDaemon(true);  // 守护线程,不会阻止JVM退出
  14.     task.thread = thread;
  15.     thread.start();  // 立即返回,不阻塞调用者
  16.    
  17.     return String.format("Background task %s started: %s",
  18.         taskId, command.substring(0, Math.min(command.length(), 80)));
  19.     // 异步启动:立即返回任务ID,不等待命令完成
  20.     // 守护线程:不会阻止程序正常退出
  21.     // 线程命名:便于调试和监控监控
  22. }
复制代码

  • 立即返回:不壅闭主线程,立即返回控制权
  • 保卫线程:背景任务不会克制JVM退出
  • 资源管理:线程自动整理,克制内存走漏
  • 友爱反馈:返回任务ID和简化的下令形貌
任务实验与结果网络
  1. /**
  2. * 线程目标:执行子进程,捕获输出,推送结果到队列
  3. */
  4. private void executeTask(TaskInfo task) {
  5.     String output;
  6.     String status;
  7.    
  8.     try {
  9.         ProcessBuilder pb = new ProcessBuilder("bash", "-c", task.command);
  10.         pb.directory(WORKDIR.toFile());
  11.         pb.redirectErrorStream(true);
  12.         
  13.         Process process = pb.start();
  14.         boolean finished = process.waitFor(300, TimeUnit.SECONDS);  // 5分钟超时
  15.         
  16.         if (!finished) {
  17.             process.destroy();
  18.             output = "Error: Timeout (300s)";
  19.             status = "timeout";
  20.         } else {
  21.             output = new String(process.getInputStream().readAllBytes()).trim();
  22.             status = "completed";
  23.         }
  24.     } catch (Exception e) {
  25.         output = "Error: " + e.getMessage();
  26.         status = "error";
  27.     }
  28.    
  29.     // 更新任务状态
  30.     task.status = status;
  31.     task.result = output.isEmpty() ? "(no output)" :
  32.                   output.substring(0, Math.min(output.length(), 50000));
  33.    
  34.     // 添加通知到队列
  35.     synchronized (lock) {
  36.         notificationQueue.offer(new TaskNotification(
  37.             task.taskId,
  38.             status,
  39.             task.command.substring(0, Math.min(task.command.length(), 80)),
  40.             task.result.substring(0, Math.min(task.result.length(), 500))
  41.         ));
  42.     }
  43. }
复制代码

  • 超时掩护:防止长时间运行的任务壅闭
  • 非常安全:全面捕捉实验非常
  • 内存管理:截断大结果,克制内存溢出
  • 变乱驱动:完成后立即关照主线程
智能关照注入机制
  1. // 在 LLM 调用前检查后台通知
  2. List<BackgroundManager.TaskNotification> notifications = BG_MANAGER.drainNotifications();
  3. if (!notifications.isEmpty() && !messages.isEmpty()) {
  4.     StringBuilder notifText = new StringBuilder();
  5.     notifText.append("<background-results>\n");
  6.    
  7.     for (BackgroundManager.TaskNotification notif : notifications) {
  8.         notifText.append(String.format("[bg:%s] %s: %s\n",
  9.             notif.taskId, notif.status, notif.result));
  10.     }
  11.    
  12.     notifText.append("</background-results>");
  13.    
  14.     messages.add(Map.of(
  15.         "role", "user",
  16.         "content", notifText.toString()
  17.     ));
  18.    
  19.     messages.add(Map.of(
  20.         "role", "assistant",
  21.         "content", "Noted background results."
  22.     ));
  23.     // 自动注入:自动将后台结果插入到对话中
  24.     // 结构化格式:XML标签明确标识内容类型
  25.     // 对话完整:添加assistant确认,保持对话结构
  26.     // 时机智能:在LLM调用前插入,确保LLM能看到最新结果
  27. }
复制代码

  • 自动同步:背景结果自动同步到主对话
  • 结构化格式:便于LLM辨认息争析
  • 对话集成:无缝集成到现有对话流
  • 机会优化:在决定前注入,确保信息实时性
工具集成架构
  1. // 后台任务工具集
  2. public enum ToolType {
  3.     BACKGROUND_RUN("background_run", "Run command in background thread. Returns task_id immediately."),
  4.     CHECK_BACKGROUND("check_background", "Check background task status. Omit task_id to list all.");
  5.     // 异步执行:立即返回,不阻塞
  6.     // 状态查询:支持单个和批量查询
  7.     // 语义清晰:工具名明确表示异步特性
  8. }
  9. // 工具处理器映射
  10. TOOL_HANDLERS.put(ToolType.BACKGROUND_RUN.name, args -> {
  11.     String command = (String) args.get("command");
  12.     return BG_MANAGER.run(command);
  13.     // 委托执行:将命令转交给后台管理器
  14.     // 立即返回:不等待任务完成
  15. });
  16. TOOL_HANDLERS.put(ToolType.CHECK_BACKGROUND.name, args -> {
  17.     String taskId = (String) args.get("task_id");
  18.     return BG_MANAGER.check(taskId);
  19.     // 灵活查询:支持单任务详查和列表概览
  20. });
复制代码

  • 接口同一:与同步工具类似的调用方式
  • 异步语义:工具名明确区分同步/异步
  • 机动查询:支持多种查询方式
  • 无缝集成:与现有工具体系完全兼容
架构演进与代价

从 ContextCompactSystem 到 BackgroundTasksSystem 的升级
维度ContextCompactSystemBackgroundTasksSystem实验模式同步串行异步并行吞吐量一次一个任务并发多个任务相应性壅闭期待立即相应资源使用单线程多线程并发任务范例短任务为主是非任务混淆
免责声明:如果侵犯了您的权益,请联系站长及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金.
回复

使用道具 举报

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表