Spaces:
Running
Running
feat(agent): add Claude Code-style agent, skills, slash-commands, hooks, todos, sandboxed workspace, and full-stack scaffolding
fc74cc0 verified | """Agent orchestration β Claude Code-style agent loop. | |
| The agent: | |
| 1. Receives a user prompt | |
| 2. Calls the model with available tools in system prompt | |
| 3. Parses the model's response for tool calls | |
| 4. Executes tools (with hooks checking) | |
| 5. Feeds results back to the model | |
| 6. Repeats until model stops calling tools or max iterations reached | |
| Tool call format (model outputs): | |
| ```tool | |
| read_file | |
| path: src/app.py | |
| ``` | |
| Or multi-line: | |
| ```tool | |
| write_file | |
| path: src/new.py | |
| content: | | |
| import os | |
| def main(): | |
| pass | |
| ``` | |
| The agent executes the tool, captures output, and feeds back as a | |
| user-style message in the next iteration. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import re | |
| from typing import Any, Iterator | |
| from code.commands import expand_command, parse_command_input | |
| from code.config.constants import SYSTEM_PROMPT | |
| from code.hooks import check_hook | |
| from code.skills import build_skills_context | |
| from code.tools import ( | |
| edit_file, | |
| glob_paths, | |
| grep_search, | |
| list_dir, | |
| multi_edit, | |
| read_file, | |
| run_bash, | |
| todo_read, | |
| todo_write, | |
| todo_update, | |
| write_file, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # βββ Tool registry ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| TOOL_REGISTRY: dict[str, Any] = { | |
| "read_file": read_file, | |
| "write_file": write_file, | |
| "edit_file": edit_file, | |
| "multi_edit": multi_edit, | |
| "list_dir": list_dir, | |
| "glob": glob_paths, | |
| "grep": grep_search, | |
| "bash": run_bash, | |
| "todo_read": todo_read, | |
| "todo_write": todo_write, | |
| "todo_update": todo_update, | |
| } | |
| def _tool_schemas() -> str: | |
| """Return a description of all available tools for the system prompt.""" | |
| return """## Available Tools | |
| You have access to these tools. To call a tool, output a fenced block with `tool` as the language, the tool name on the first line, and parameters as `key: value` pairs (one per line). For multi-line values, use YAML `|` block syntax. | |
| ### read_file | |
| Read a text file from the workspace. | |
| ``` | |
| read_file | |
| path: src/app.py | |
| ``` | |
| Optional: `offset` (1-indexed line to start from), `limit` (max lines). | |
| ### write_file | |
| Write content to a file (creates parent dirs). | |
| ``` | |
| write_file | |
| path: src/new.py | |
| content: | | |
| import os | |
| def main(): | |
| print("hello") | |
| ``` | |
| ### edit_file | |
| Replace text in a file. | |
| ``` | |
| edit_file | |
| path: src/app.py | |
| old_str: print("hello") | |
| new_str: print("goodbye") | |
| ``` | |
| Optional: `replace_all: true` to replace all occurrences. | |
| ### multi_edit | |
| Apply multiple edits atomically. | |
| ``` | |
| multi_edit | |
| path: src/app.py | |
| edits: | | |
| - old_str: "foo" | |
| new_str: "bar" | |
| - old_str: "baz" | |
| new_str: "qux" | |
| ``` | |
| ### list_dir | |
| List directory contents. | |
| ``` | |
| list_dir | |
| path: src | |
| ``` | |
| ### glob | |
| Find files matching a pattern. | |
| ``` | |
| glob | |
| pattern: **/*.py | |
| path: . | |
| ``` | |
| ### grep | |
| Search file contents with regex. | |
| ``` | |
| grep | |
| pattern: def main | |
| path: . | |
| include: *.py | |
| ``` | |
| Optional: `ignore_case: true`, `max_results: 50`. | |
| ### bash | |
| Run a shell command (sandboxed to workspace). | |
| ``` | |
| bash | |
| command: npm test | |
| timeout: 30 | |
| ``` | |
| Optional: `cwd`, `timeout` (default 30s). | |
| ### todo_write | |
| Replace the entire todo list. | |
| ``` | |
| todo_write | |
| todos: | | |
| - id: "1" | |
| content: "Set up project structure" | |
| status: "in_progress" | |
| priority: "high" | |
| - id: "2" | |
| content: "Implement API endpoints" | |
| status: "pending" | |
| priority: "high" | |
| ``` | |
| ### todo_read | |
| Read the current todo list. No parameters. | |
| ### todo_update | |
| Update a single todo by id. | |
| ``` | |
| todo_update | |
| todo_id: "1" | |
| status: "completed" | |
| ``` | |
| ## Rules | |
| - Call ONE tool per turn. Wait for the result before calling the next. | |
| - After tool results come back, summarize what you learned and decide the next step. | |
| - If you don't need a tool, just respond normally. | |
| - Use `todo_write` to track multi-step tasks. | |
| - Always use `read_file` before `edit_file` so you know the exact content. | |
| - Use `bash` for git, test running, and other shell tasks. | |
| """ | |
| # βββ Tool call parsing ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _TOOL_BLOCK_RE = re.compile( | |
| r"```tool\s*\n(.*?)```", | |
| re.DOTALL, | |
| ) | |
| def _parse_yaml_block(text: str) -> dict[str, Any]: | |
| """Parse a simple YAML-like block into a dict. | |
| Supports: | |
| - key: value (single line) | |
| - key: | (multi-line block scalar) | |
| - key: > (folded scalar) | |
| - Nested lists with - item | |
| """ | |
| result: dict[str, Any] = {} | |
| lines = text.split("\n") | |
| i = 0 | |
| while i < len(lines): | |
| line = lines[i] | |
| stripped = line.rstrip() | |
| if not stripped or stripped.startswith("#"): | |
| i += 1 | |
| continue | |
| # Match key: value or key: | or key: > | |
| m = re.match(r"^(\w+)\s*:\s*(.*)$", stripped) | |
| if not m: | |
| i += 1 | |
| continue | |
| key = m.group(1) | |
| value = m.group(2).strip() | |
| if value in ("|", "|-", ">", ">-"): | |
| # Multi-line block scalar β collect indented lines | |
| collect: list[str] = [] | |
| i += 1 | |
| while i < len(lines): | |
| next_line = lines[i] | |
| if next_line.strip() == "" and i + 1 < len(lines) and not lines[i + 1].startswith(" "): | |
| break | |
| if next_line.startswith(" ") or next_line.startswith("\t") or next_line.strip() == "": | |
| collect.append(next_line) | |
| i += 1 | |
| else: | |
| break | |
| # Dedent | |
| block = "\n".join(collect) | |
| # Remove common leading whitespace | |
| block = re.sub(r"^( {2}|\t)", "", block, flags=re.MULTILINE) | |
| result[key] = block.rstrip() | |
| else: | |
| # Try parsing as JSON for complex values | |
| if value.startswith("[") or value.startswith("{"): | |
| try: | |
| result[key] = json.loads(value) | |
| except json.JSONDecodeError: | |
| result[key] = value | |
| else: | |
| result[key] = value | |
| i += 1 | |
| return result | |
| def _parse_tool_call(text: str) -> dict[str, Any] | None: | |
| """Parse a single tool call block content into {tool, args}.""" | |
| lines = text.strip().split("\n", 1) | |
| if not lines: | |
| return None | |
| tool_name = lines[0].strip() | |
| if tool_name not in TOOL_REGISTRY: | |
| return {"tool": tool_name, "error": f"Unknown tool: {tool_name}"} | |
| args_block = lines[1] if len(lines) > 1 else "" | |
| args = _parse_yaml_block(args_block) | |
| # Type coercion for known int/bool fields | |
| if "timeout" in args: | |
| try: | |
| args["timeout"] = int(args["timeout"]) | |
| except (ValueError, TypeError): | |
| pass | |
| if "offset" in args: | |
| try: | |
| args["offset"] = int(args["offset"]) | |
| except (ValueError, TypeError): | |
| pass | |
| if "limit" in args: | |
| try: | |
| args["limit"] = int(args["limit"]) | |
| except (ValueError, TypeError): | |
| pass | |
| if "replace_all" in args: | |
| args["replace_all"] = str(args["replace_all"]).lower() in ("true", "1", "yes") | |
| if "ignore_case" in args: | |
| args["ignore_case"] = str(args["ignore_case"]).lower() in ("true", "1", "yes") | |
| if "todos" in args and isinstance(args["todos"], str): | |
| # Parse YAML list of todos | |
| todos: list[dict[str, Any]] = [] | |
| for block in re.split(r"\n\s*-\s+", "\n" + args["todos"]): | |
| if not block.strip(): | |
| continue | |
| todo: dict[str, Any] = {} | |
| for line in block.splitlines(): | |
| m = re.match(r"(\w+):\s*(.*)$", line.strip()) | |
| if m: | |
| val = m.group(2).strip() | |
| if m.group(1) in {"status", "priority"}: | |
| todo[m.group(1)] = val | |
| else: | |
| todo[m.group(1)] = val | |
| if todo: | |
| todos.append(todo) | |
| args["todos"] = todos | |
| if "edits" in args and isinstance(args["edits"], str): | |
| # Parse YAML list of edits | |
| edits: list[dict[str, str]] = [] | |
| for block in re.split(r"\n\s*-\s+", "\n" + args["edits"]): | |
| if not block.strip(): | |
| continue | |
| edit: dict[str, str] = {} | |
| for line in block.splitlines(): | |
| m = re.match(r"(\w+):\s*(.*)$", line.strip()) | |
| if m: | |
| edit[m.group(1)] = m.group(2).strip() | |
| if edit: | |
| edits.append(edit) | |
| args["edits"] = edits | |
| return {"tool": tool_name, "args": args} | |
| def find_tool_calls(text: str) -> list[dict[str, Any]]: | |
| """Find all tool call blocks in the model's output.""" | |
| calls: list[dict[str, Any]] = [] | |
| for match in _TOOL_BLOCK_RE.finditer(text): | |
| parsed = _parse_tool_call(match.group(1)) | |
| if parsed: | |
| calls.append(parsed) | |
| return calls | |
| # βββ Tool execution with hooks ββββββββββββββββββββββββββββββββββββββββββ | |
| def execute_tool(tool_name: str, args: dict[str, Any]) -> dict[str, Any]: | |
| """Execute a single tool with hook checks.""" | |
| if tool_name not in TOOL_REGISTRY: | |
| return {"success": False, "error": f"Unknown tool: {tool_name}"} | |
| # Hook check | |
| if tool_name == "bash": | |
| hook_context = {"command": str(args.get("command", ""))} | |
| hook_result = check_hook("bash", hook_context) | |
| elif tool_name in {"write_file", "edit_file", "multi_edit"}: | |
| hook_context = { | |
| "file_path": str(args.get("path", "")), | |
| "new_text": str(args.get("content", args.get("new_str", ""))), | |
| } | |
| hook_result = check_hook("file", hook_context) | |
| else: | |
| hook_result = {"blocked": False, "warnings": [], "matched_hooks": []} | |
| if hook_result["blocked"]: | |
| return { | |
| "success": False, | |
| "error": "Blocked by hook rule", | |
| "hook_warnings": hook_result["warnings"], | |
| "blocked": True, | |
| } | |
| try: | |
| fn = TOOL_REGISTRY[tool_name] | |
| result = fn(**args) if args else fn() | |
| # Attach any warnings | |
| if hook_result["warnings"]: | |
| result["hook_warnings"] = hook_result["warnings"] | |
| return result | |
| except TypeError as exc: | |
| return {"success": False, "error": f"Invalid arguments: {exc}"} | |
| except Exception as exc: | |
| logger.exception("Tool execution failed: %s", tool_name) | |
| return {"success": False, "error": str(exc)} | |
| # βββ Agent loop βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| MAX_ITERATIONS = 8 | |
| def build_agent_system_prompt( | |
| target_language: str = "", | |
| target_framework: str = "", | |
| skills: list[str] | None = None, | |
| agent_name: str | None = None, | |
| ) -> str: | |
| """Build the system prompt with tool descriptions and skill context. | |
| If `agent_name` is provided and matches a saved custom agent, that | |
| agent's persona/system-prompt extension is appended, and any skills | |
| declared on the agent are auto-loaded. | |
| """ | |
| parts = [ | |
| SYSTEM_PROMPT, | |
| "", | |
| _tool_schemas(), | |
| "", | |
| "## Agent Behavior", | |
| "", | |
| "- You are an autonomous coding agent. Use tools to inspect and modify the workspace.", | |
| "- Always plan first with `todo_write` when given a multi-step task.", | |
| "- Use `read_file` before `edit_file` to know exact content.", | |
| "- After each tool result, briefly note what you learned before the next step.", | |
| "- When done, give a concise summary of what you did and what files changed.", | |
| "- If a hook warns you, acknowledge it and adjust your approach.", | |
| ] | |
| if target_language or target_framework: | |
| parts.append("") | |
| parts.append(f"Target: {target_language}" + (f" / {target_framework}" if target_framework else "")) | |
| # ββ Custom agent persona βββββββββββββββββββββββββββββββββββββββββββ | |
| merged_skills = list(skills or []) | |
| if agent_name: | |
| try: | |
| from code.agents import get_agent, build_agent_system_prompt_extension, ALL_TOOLS | |
| agent_cfg = get_agent(agent_name) | |
| if agent_cfg: | |
| ext = build_agent_system_prompt_extension(agent_name) | |
| if ext: | |
| parts.append("") | |
| parts.append(ext) | |
| # Auto-merge agent-declared skills (after user-selected ones) | |
| for s in agent_cfg.get("skills", []): | |
| if s not in merged_skills: | |
| merged_skills.append(s) | |
| else: | |
| logger.warning("Agent '%s' not found; running default", agent_name) | |
| except Exception as exc: | |
| logger.warning("Failed to load custom agent '%s': %s", agent_name, exc) | |
| skills_ctx = build_skills_context(merged_skills or None) | |
| if skills_ctx: | |
| parts.append("") | |
| parts.append("## Skills Loaded") | |
| parts.append("") | |
| parts.append(skills_ctx) | |
| return "\n".join(parts) | |
| def run_agent( | |
| user_input: str, | |
| history: list[dict[str, str]] | None = None, | |
| target_language: str = "", | |
| target_framework: str = "", | |
| skills: list[str] | None = None, | |
| search_context: str = "", | |
| image_url: str | None = None, | |
| agent_name: str | None = None, | |
| ) -> Iterator[dict[str, Any]]: | |
| """Run the agent loop. Yields events as dict. | |
| Events: | |
| - {type: "status", status_text, status_state, ...} | |
| - {type: "tool_call", tool, args, result} | |
| - {type: "streaming", content, ...} | |
| - {type: "complete", content, ...} | |
| - {type: "error", message, ...} | |
| """ | |
| from code.model.inference import call_model | |
| from code.model.loader import get_model_status, is_model_loaded | |
| history = history or [] | |
| # Check for slash command | |
| cmd_name, cmd_args = parse_command_input(user_input) | |
| if cmd_name: | |
| expansion = expand_command(cmd_name, cmd_args) | |
| if expansion.get("success"): | |
| # Replace user input with expanded command | |
| user_input = expansion["prompt"] | |
| yield { | |
| "type": "status", | |
| "status_text": f"Running /{cmd_name} command...", | |
| "status_state": "working", | |
| } | |
| else: | |
| yield { | |
| "type": "error", | |
| "message": expansion.get("error", "Unknown command"), | |
| "available": expansion.get("available", []), | |
| } | |
| return | |
| # Hook check on user prompt | |
| prompt_hook = check_hook("prompt", {"user_prompt": user_input}) | |
| if prompt_hook["blocked"]: | |
| yield { | |
| "type": "error", | |
| "message": "Prompt blocked by hook rule", | |
| "warnings": prompt_hook["warnings"], | |
| } | |
| return | |
| # Model status | |
| if not is_model_loaded(): | |
| status = get_model_status() | |
| yield { | |
| "type": "error", | |
| "message": status["message"], | |
| } | |
| return | |
| # ββ Resolve active agent (explicit > session-active > none) ββββββ | |
| if not agent_name: | |
| try: | |
| from code.agents import get_active_agent | |
| agent_name = get_active_agent() | |
| except Exception: | |
| agent_name = None | |
| # ββ Special-case: /agent create β AI generates an agent definition β | |
| # The slash-command expansion already substituted the AGENT_GENERATION_PROMPT | |
| # into `user_input`, so we just need to make sure agent_name is NOT applied | |
| # (we want the default SoniCoder persona to author the new agent). | |
| creating_agent = False | |
| if user_input.lstrip().startswith("You are creating a custom agent definition"): | |
| creating_agent = True | |
| agent_name = None # don't layer persona on top of meta-prompt | |
| # ββ Load agent config (for tool whitelist + max_iterations) ββββββββ | |
| agent_cfg = None | |
| if agent_name and not creating_agent: | |
| try: | |
| from code.agents import get_agent | |
| agent_cfg = get_agent(agent_name) | |
| if not agent_cfg: | |
| yield { | |
| "type": "status", | |
| "status_text": f"Agent '{agent_name}' not found; using default.", | |
| "status_state": "warning", | |
| } | |
| agent_name = None | |
| else: | |
| yield { | |
| "type": "status", | |
| "status_text": f"Running as agent: {agent_cfg['name']}", | |
| "status_state": "working", | |
| "agent": agent_cfg["name"], | |
| } | |
| except Exception as exc: | |
| logger.warning("Failed to load agent '%s': %s", agent_name, exc) | |
| agent_name = None | |
| # Determine iteration cap | |
| iter_cap = MAX_ITERATIONS | |
| if agent_cfg and agent_cfg.get("max_iterations"): | |
| try: | |
| iter_cap = max(1, min(40, int(agent_cfg["max_iterations"]))) | |
| except (ValueError, TypeError): | |
| pass | |
| # Build system prompt | |
| system_prompt = build_agent_system_prompt(target_language, target_framework, skills, agent_name=agent_name) | |
| # Add search context if present | |
| if search_context: | |
| user_input = f"{user_input}\n\n--- Web Search Results ---\n{search_context}" | |
| # Build messages | |
| messages: list[dict[str, Any]] = [{"role": "system", "content": system_prompt}] | |
| for h in history: | |
| role = h.get("role", "user") | |
| content = str(h.get("content", "")).strip() | |
| if role in {"user", "assistant"} and content: | |
| messages.append({"role": role, "content": content}) | |
| messages.append({"role": "user", "content": user_input}) | |
| # Agent loop | |
| for iteration in range(iter_cap): | |
| yield { | |
| "type": "status", | |
| "status_text": f"Thinking... (step {iteration + 1})", | |
| "status_state": "working", | |
| "iteration": iteration + 1, | |
| } | |
| # Call model | |
| full_response = "" | |
| for partial in call_model(messages, image_url=image_url): | |
| full_response = partial | |
| yield { | |
| "type": "streaming", | |
| "content": partial, | |
| "iteration": iteration + 1, | |
| } | |
| if not full_response: | |
| yield {"type": "error", "message": "Empty model response"} | |
| return | |
| # Check for tool calls | |
| tool_calls = find_tool_calls(full_response) | |
| if not tool_calls: | |
| # No tools called β final response | |
| yield { | |
| "type": "complete", | |
| "content": full_response, | |
| "iterations": iteration + 1, | |
| } | |
| return | |
| # Execute each tool call in order | |
| for tc in tool_calls: | |
| tool_name = tc.get("tool") | |
| args = tc.get("args", {}) | |
| # ββ Enforce agent tool whitelist ββββββββββββββββββββββββββββ | |
| if agent_cfg and agent_cfg.get("tools"): | |
| allowed = set(agent_cfg["tools"]) | |
| if tool_name not in allowed: | |
| tool_result = { | |
| "success": False, | |
| "error": ( | |
| f"Tool '{tool_name}' is not in the active agent's " | |
| f"whitelist: {sorted(allowed)}. The agent '{agent_cfg['name']}' " | |
| "is configured to use only those tools." | |
| ), | |
| "blocked_by_agent_whitelist": True, | |
| } | |
| yield { | |
| "type": "tool_result", | |
| "tool": tool_name, | |
| "result": tool_result, | |
| "iteration": iteration + 1, | |
| } | |
| # Feed back to model so it can pick a different tool | |
| result_str = json.dumps(tool_result, indent=2, default=str) | |
| messages.append({"role": "assistant", "content": full_response}) | |
| messages.append({ | |
| "role": "user", | |
| "content": f"Tool `{tool_name}` result:\n```json\n{result_str}\n```\n\nChoose a different tool from the agent's whitelist, or finish if done.", | |
| }) | |
| continue | |
| if "error" in tc: | |
| # Unknown tool | |
| tool_result = {"success": False, "error": tc["error"]} | |
| else: | |
| yield { | |
| "type": "tool_call", | |
| "tool": tool_name, | |
| "args": args, | |
| "iteration": iteration + 1, | |
| } | |
| tool_result = execute_tool(tool_name, args) | |
| yield { | |
| "type": "tool_result", | |
| "tool": tool_name, | |
| "result": tool_result, | |
| "iteration": iteration + 1, | |
| } | |
| # Feed result back to model | |
| result_str = json.dumps(tool_result, indent=2, default=str) | |
| messages.append({"role": "assistant", "content": full_response}) | |
| messages.append({ | |
| "role": "user", | |
| "content": f"Tool `{tool_name}` result:\n```json\n{result_str}\n```\n\nContinue with the next step or finish if done.", | |
| }) | |
| # Max iterations reached | |
| yield { | |
| "type": "complete", | |
| "content": full_response + "\n\n_(Max iterations reached)_", | |
| "iterations": iter_cap, | |
| } | |