"""Bash subprocess tool with timeout and output capture.""" from __future__ import annotations import os import shlex import subprocess from typing import Any from code.tools.fs import _resolve_safe, get_workspace_root # ─── Safety: commands that are forbidden by default ──────────────────── _BLOCKED_PATTERNS = [ "rm -rf /", "rm -rf ~", "rm -rf $HOME", ":(){:|:&};:", "mkfs", "dd if=/dev/zero of=/dev/", "> /dev/sda", "shutdown", "reboot", "halt", ] # Default env vars to scrub for safety _ENV_SCRUB = {"HF_TOKEN", "OPENAI_API_KEY", "ANTHROPIC_API_KEY", "AWS_SECRET_ACCESS_KEY"} def _is_safe_command(cmd: str) -> tuple[bool, str]: """Check if a command is safe to run.""" stripped = cmd.strip() if not stripped: return False, "Empty command" for pat in _BLOCKED_PATTERNS: if pat in stripped: return False, f"Blocked pattern detected: {pat}" return True, "" def run_bash( command: str, cwd: str | None = None, timeout: int = 30, env_extra: dict[str, str] | None = None, ) -> dict[str, Any]: """Run a shell command in the workspace. Args: command: Shell command to execute. cwd: Working directory (relative to workspace root, defaults to workspace). timeout: Max seconds before killing the process. env_extra: Extra environment variables. Returns: dict with: stdout, stderr, returncode, timed_out """ try: safe, reason = _is_safe_command(command) if not safe: return { "success": False, "stdout": "", "stderr": reason, "returncode": -1, "timed_out": False, } if cwd: work_dir = _resolve_safe(cwd) else: work_dir = get_workspace_root() # Build env: scrub secrets, add extras env = {k: v for k, v in os.environ.items() if k not in _ENV_SCRUB} if env_extra: env.update(env_extra) # Run with bash -c for full shell semantics completed = subprocess.run( ["bash", "-c", command], cwd=work_dir, env=env, capture_output=True, text=True, timeout=timeout, check=False, ) # Truncate huge outputs stdout = completed.stdout stderr = completed.stderr if len(stdout) > 50_000: stdout = stdout[:50_000] + f"\n... truncated ({len(stdout) - 50_000} chars) ..." if len(stderr) > 50_000: stderr = stderr[:50_000] + f"\n... truncated ({len(stderr) - 50_000} chars) ..." return { "success": completed.returncode == 0, "stdout": stdout, "stderr": stderr, "returncode": completed.returncode, "timed_out": False, "command": command, "cwd": cwd or ".", } except subprocess.TimeoutExpired as exc: stdout = exc.stdout or "" if isinstance(exc.stdout, str) else (exc.stdout or b"").decode("utf-8", errors="replace") stderr = exc.stderr or "" if isinstance(exc.stderr, str) else (exc.stderr or b"").decode("utf-8", errors="replace") return { "success": False, "stdout": stdout, "stderr": f"Timeout after {timeout}s\n{stderr}", "returncode": -1, "timed_out": True, "command": command, } except Exception as exc: return { "success": False, "stdout": "", "stderr": str(exc), "returncode": -1, "timed_out": False, "command": command, }