Spaces:
Running
Running
feat(agent): add Claude Code-style agent, skills, slash-commands, hooks, todos, sandboxed workspace, and full-stack scaffolding
81aa0b5 verified | """Bash subprocess tool with timeout and output capture.""" | |
| from __future__ import annotations | |
| import os | |
| import shlex | |
| import subprocess | |
| from typing import Any | |
| from code.tools.fs import _resolve_safe, get_workspace_root | |
| # βββ Safety: commands that are forbidden by default ββββββββββββββββββββ | |
| _BLOCKED_PATTERNS = [ | |
| "rm -rf /", | |
| "rm -rf ~", | |
| "rm -rf $HOME", | |
| ":(){:|:&};:", | |
| "mkfs", | |
| "dd if=/dev/zero of=/dev/", | |
| "> /dev/sda", | |
| "shutdown", | |
| "reboot", | |
| "halt", | |
| ] | |
| # Default env vars to scrub for safety | |
| _ENV_SCRUB = {"HF_TOKEN", "OPENAI_API_KEY", "ANTHROPIC_API_KEY", "AWS_SECRET_ACCESS_KEY"} | |
| def _is_safe_command(cmd: str) -> tuple[bool, str]: | |
| """Check if a command is safe to run.""" | |
| stripped = cmd.strip() | |
| if not stripped: | |
| return False, "Empty command" | |
| for pat in _BLOCKED_PATTERNS: | |
| if pat in stripped: | |
| return False, f"Blocked pattern detected: {pat}" | |
| return True, "" | |
| def run_bash( | |
| command: str, | |
| cwd: str | None = None, | |
| timeout: int = 30, | |
| env_extra: dict[str, str] | None = None, | |
| ) -> dict[str, Any]: | |
| """Run a shell command in the workspace. | |
| Args: | |
| command: Shell command to execute. | |
| cwd: Working directory (relative to workspace root, defaults to workspace). | |
| timeout: Max seconds before killing the process. | |
| env_extra: Extra environment variables. | |
| Returns: | |
| dict with: stdout, stderr, returncode, timed_out | |
| """ | |
| try: | |
| safe, reason = _is_safe_command(command) | |
| if not safe: | |
| return { | |
| "success": False, | |
| "stdout": "", | |
| "stderr": reason, | |
| "returncode": -1, | |
| "timed_out": False, | |
| } | |
| if cwd: | |
| work_dir = _resolve_safe(cwd) | |
| else: | |
| work_dir = get_workspace_root() | |
| # Build env: scrub secrets, add extras | |
| env = {k: v for k, v in os.environ.items() if k not in _ENV_SCRUB} | |
| if env_extra: | |
| env.update(env_extra) | |
| # Run with bash -c for full shell semantics | |
| completed = subprocess.run( | |
| ["bash", "-c", command], | |
| cwd=work_dir, | |
| env=env, | |
| capture_output=True, | |
| text=True, | |
| timeout=timeout, | |
| check=False, | |
| ) | |
| # Truncate huge outputs | |
| stdout = completed.stdout | |
| stderr = completed.stderr | |
| if len(stdout) > 50_000: | |
| stdout = stdout[:50_000] + f"\n... truncated ({len(stdout) - 50_000} chars) ..." | |
| if len(stderr) > 50_000: | |
| stderr = stderr[:50_000] + f"\n... truncated ({len(stderr) - 50_000} chars) ..." | |
| return { | |
| "success": completed.returncode == 0, | |
| "stdout": stdout, | |
| "stderr": stderr, | |
| "returncode": completed.returncode, | |
| "timed_out": False, | |
| "command": command, | |
| "cwd": cwd or ".", | |
| } | |
| except subprocess.TimeoutExpired as exc: | |
| stdout = exc.stdout or "" if isinstance(exc.stdout, str) else (exc.stdout or b"").decode("utf-8", errors="replace") | |
| stderr = exc.stderr or "" if isinstance(exc.stderr, str) else (exc.stderr or b"").decode("utf-8", errors="replace") | |
| return { | |
| "success": False, | |
| "stdout": stdout, | |
| "stderr": f"Timeout after {timeout}s\n{stderr}", | |
| "returncode": -1, | |
| "timed_out": True, | |
| "command": command, | |
| } | |
| except Exception as exc: | |
| return { | |
| "success": False, | |
| "stdout": "", | |
| "stderr": str(exc), | |
| "returncode": -1, | |
| "timed_out": False, | |
| "command": command, | |
| } | |