Spaces:
Running
Running
fix: agent_run param mismatch (send agent_name) + add GitHub push-update (3 inputs: repo name, token, username; --force-with-lease)
b87f702 verified | """GitHub import tool β clone a repo into the sandboxed workspace. | |
| This module lets the user (or the agent, via the `/github` slash command) | |
| import an existing GitHub repository into the SoniCoder workspace so the | |
| agent can read, edit, and build on top of real code instead of starting | |
| from scratch. | |
| Security model | |
| -------------- | |
| * Only `https://github.com/...` URLs are accepted (no SSH, no file://, no | |
| other hosts). This prevents the clone URL from being used to exfiltrate | |
| data via a malicious git config. | |
| * The repo is cloned into a temp directory first, then *copied* into the | |
| workspace root with `.git/`, `node_modules/`, `__pycache__/`, and other | |
| heavy / unnecessary directories stripped. This: | |
| - avoids polluting the workspace with a `.git` subdir the agent would | |
| otherwise try to walk, | |
| - caps the size of the import (clone-depth 1 + ignored dirs), | |
| - sidesteps git's "destination exists" failure mode. | |
| * Optional `subdir` argument lets the user import only a sub-directory of | |
| the repo (e.g. `examples/quickstart`). This is useful for monorepos. | |
| The function returns a JSON-serializable dict that the frontend / agent | |
| loop can consume directly. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import re | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Any | |
| from code.tools.fs import get_workspace_root | |
| # βββ URL validation ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Accept: | |
| # https://github.com/owner/repo | |
| # https://github.com/owner/repo.git | |
| # https://github.com/owner/repo/tree/main | |
| # https://github.com/owner/repo/tree/main/subdir | |
| # https://github.com/owner/repo/branches/main/subdir (older form) | |
| # git@github.com:owner/repo.git β rewritten to https | |
| _GITHUB_HTTPS = re.compile( | |
| r"^https://github\.com/(?P<owner>[^/\s]+)/(?P<repo>[^/\s]+?)(?:\.git)?(?:/(?:tree|blob|branches)/([^/\s]+)(?:/(.*))?)?/?$", | |
| re.IGNORECASE, | |
| ) | |
| _GITHUB_SSH = re.compile( | |
| r"^git@github\.com:(?P<owner>[^/\s]+)/(?P<repo>[^/\s]+?)(?:\.git)?/?$", | |
| re.IGNORECASE, | |
| ) | |
| # Directories that are usually huge / not useful in the agent workspace | |
| _STRIP_DIRS = { | |
| ".git", | |
| ".hg", | |
| ".svn", | |
| "node_modules", | |
| "__pycache__", | |
| ".venv", | |
| "venv", | |
| "env", | |
| ".tox", | |
| ".mypy_cache", | |
| ".pytest_cache", | |
| ".ruff_cache", | |
| "dist", | |
| "build", | |
| ".next", | |
| ".nuxt", | |
| ".cache", | |
| ".gradle", | |
| "target", | |
| "Pods", | |
| } | |
| # Files we never want to land in the workspace | |
| _STRIP_FILES = { | |
| ".DS_Store", | |
| "Thumbs.db", | |
| } | |
| def _parse_github_url(url: str) -> dict[str, str | None]: | |
| """Parse a GitHub URL into owner, repo, branch, subdir. | |
| Returns a dict with keys: owner, repo, branch, subdir, clone_url. | |
| Raises ValueError if the URL is not a valid GitHub URL. | |
| """ | |
| url = (url or "").strip() | |
| if not url: | |
| raise ValueError("Empty GitHub URL") | |
| m = _GITHUB_HTTPS.match(url) | |
| if m: | |
| owner = m.group("owner") | |
| repo = m.group("repo") | |
| branch = m.group(3) | |
| subdir = m.group(4) or "" | |
| clone_url = f"https://github.com/{owner}/{repo}.git" | |
| return { | |
| "owner": owner, | |
| "repo": repo, | |
| "branch": branch, | |
| "subdir": subdir.strip("/"), | |
| "clone_url": clone_url, | |
| } | |
| m = _GITHUB_SSH.match(url) | |
| if m: | |
| owner = m.group("owner") | |
| repo = m.group("repo") | |
| clone_url = f"https://github.com/{owner}/{repo}.git" | |
| return { | |
| "owner": owner, | |
| "repo": repo, | |
| "branch": None, | |
| "subdir": "", | |
| "clone_url": clone_url, | |
| } | |
| raise ValueError( | |
| f"Invalid GitHub URL: {url!r}. " | |
| "Expected https://github.com/<owner>/<repo> or git@github.com:<owner>/<repo>.git" | |
| ) | |
| def _git_available() -> bool: | |
| """Check whether the `git` binary is on PATH.""" | |
| try: | |
| result = subprocess.run( | |
| ["git", "--version"], | |
| capture_output=True, | |
| timeout=5, | |
| ) | |
| return result.returncode == 0 | |
| except (FileNotFoundError, subprocess.TimeoutExpired): | |
| return False | |
| def _safe_relpath(root: str, full: str) -> str: | |
| """Return path relative to root, or empty if equal to root.""" | |
| return os.path.relpath(full, root) | |
| def _copy_filtered(src: str, dst: str) -> tuple[int, int]: | |
| """Copy `src` (a directory) into `dst`, skipping _STRIP_DIRS/_STRIP_FILES. | |
| Returns (files_copied, dirs_skipped). | |
| """ | |
| files_copied = 0 | |
| dirs_skipped = 0 | |
| for dirpath, dirnames, fnames in os.walk(src, topdown=True): | |
| # In-place mutate dirnames so os.walk skips them | |
| keep_dirs = [] | |
| for d in dirnames: | |
| if d in _STRIP_DIRS: | |
| dirs_skipped += 1 | |
| else: | |
| keep_dirs.append(d) | |
| dirnames[:] = keep_dirs | |
| rel = _safe_relpath(src, dirpath) | |
| target_dir = dst if rel == "." else os.path.join(dst, rel) | |
| os.makedirs(target_dir, exist_ok=True) | |
| for fname in fnames: | |
| if fname in _STRIP_FILES: | |
| continue | |
| try: | |
| shutil.copy2(os.path.join(dirpath, fname), os.path.join(target_dir, fname)) | |
| files_copied += 1 | |
| except (OSError, shutil.SpecialFileError): | |
| # Skip files we can't copy (sockets, fifos, etc.) | |
| continue | |
| return files_copied, dirs_skipped | |
| # βββ Public API ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def import_github_repo( | |
| url: str, | |
| branch: str = "", | |
| subdir: str = "", | |
| target_subdir: str = "", | |
| depth: int = 1, | |
| timeout: int = 120, | |
| ) -> dict[str, Any]: | |
| """Clone a GitHub repo into the workspace. | |
| Args: | |
| url: GitHub URL (https or SSH form). May include /tree/<branch>/<subdir>. | |
| branch: Optional branch/tag to checkout. If empty, uses the URL's | |
| embedded branch (if any) or the repo's default branch. | |
| subdir: Optional sub-directory inside the repo to import. If the URL | |
| already includes /tree/<branch>/<subdir>, this is added on | |
| top (rare). | |
| target_subdir: Where inside the workspace to place the import. If | |
| empty, places at workspace root. If non-empty and the | |
| directory exists, it will be overwritten. | |
| depth: Git clone depth. Default 1 (shallow). | |
| timeout: Git clone timeout in seconds. | |
| Returns: | |
| dict with keys: | |
| success: bool | |
| message: str | |
| url: original URL | |
| owner, repo, branch, subdir: parsed | |
| files_imported: int | |
| dirs_skipped: int | |
| workspace_path: relative path inside the workspace where files landed | |
| tree: dict (workspace tree summary, optional) | |
| error: str (on failure) | |
| """ | |
| try: | |
| parsed = _parse_github_url(url) | |
| except ValueError as exc: | |
| return { | |
| "success": False, | |
| "message": str(exc), | |
| "url": url, | |
| "error": str(exc), | |
| } | |
| # Resolve effective branch / subdir | |
| effective_branch = branch.strip() or parsed.get("branch") or "" | |
| effective_subdir = subdir.strip() or parsed.get("subdir") or "" | |
| if not _git_available(): | |
| return { | |
| "success": False, | |
| "message": "`git` is not installed in this environment. Cannot clone.", | |
| "url": url, | |
| "error": "git binary not found", | |
| } | |
| workspace_root = get_workspace_root() | |
| os.makedirs(workspace_root, exist_ok=True) | |
| # Decide destination inside workspace | |
| if target_subdir.strip(): | |
| # Sanitize target_subdir β no path escapes | |
| target_rel = target_subdir.strip().strip("/").lstrip(".") | |
| if not target_rel or target_rel.startswith("/"): | |
| return { | |
| "success": False, | |
| "message": f"Invalid target_subdir: {target_subdir!r}", | |
| "url": url, | |
| "error": "invalid target_subdir", | |
| } | |
| dest_root = os.path.join(workspace_root, target_rel) | |
| workspace_rel = target_rel | |
| else: | |
| dest_root = workspace_root | |
| workspace_rel = "" | |
| # If dest_root already has content, clear it first so the import is clean. | |
| # (We never touch anything outside dest_root.) | |
| if os.path.isdir(dest_root): | |
| for entry in os.listdir(dest_root): | |
| full = os.path.join(dest_root, entry) | |
| try: | |
| if os.path.isdir(full): | |
| shutil.rmtree(full) | |
| else: | |
| os.remove(full) | |
| except OSError as exc: | |
| # Don't fail the whole import over a single file | |
| pass | |
| # Clone into a temp dir | |
| with tempfile.TemporaryDirectory(prefix="sonicoder_gh_") as tmp: | |
| clone_target = os.path.join(tmp, parsed["repo"]) | |
| cmd = [ | |
| "git", | |
| "clone", | |
| "--depth", | |
| str(int(depth)), | |
| "--single-branch", | |
| ] | |
| if effective_branch: | |
| cmd.extend(["--branch", effective_branch]) | |
| cmd.extend([parsed["clone_url"], clone_target]) | |
| try: | |
| proc = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| timeout=timeout, | |
| text=True, | |
| ) | |
| except subprocess.TimeoutExpired: | |
| return { | |
| "success": False, | |
| "message": f"Git clone timed out after {timeout}s", | |
| "url": url, | |
| "error": "clone_timeout", | |
| } | |
| if proc.returncode != 0: | |
| stderr = (proc.stderr or "").strip() | |
| return { | |
| "success": False, | |
| "message": f"git clone failed: {stderr[:500]}", | |
| "url": url, | |
| "error": stderr[:500], | |
| } | |
| if not os.path.isdir(clone_target): | |
| return { | |
| "success": False, | |
| "message": "git clone appeared to succeed but the target directory is missing", | |
| "url": url, | |
| "error": "clone_dir_missing", | |
| } | |
| # If subdir is requested, validate it | |
| src_root = clone_target | |
| if effective_subdir: | |
| candidate = os.path.join(clone_target, effective_subdir) | |
| if not os.path.isdir(candidate): | |
| return { | |
| "success": False, | |
| "message": ( | |
| f"Subdirectory '{effective_subdir}' not found in repo " | |
| f"{parsed['owner']}/{parsed['repo']}" | |
| ), | |
| "url": url, | |
| "error": "subdir_not_found", | |
| } | |
| src_root = candidate | |
| # Copy filtered files into dest_root | |
| files_imported, dirs_skipped = _copy_filtered(src_root, dest_root) | |
| # Build a brief tree summary (top 2 levels) for the response | |
| tree_summary: list[str] = [] | |
| try: | |
| for name in sorted(os.listdir(dest_root))[:30]: | |
| full = os.path.join(dest_root, name) | |
| tree_summary.append(f"{name}/" if os.path.isdir(full) else name) | |
| except OSError: | |
| pass | |
| msg = ( | |
| f"Imported {files_imported} file(s) from {parsed['owner']}/{parsed['repo']}" | |
| + (f" (branch: {effective_branch})" if effective_branch else "") | |
| + (f" subdir: {effective_subdir}" if effective_subdir else "") | |
| + (f" into {workspace_rel}/" if workspace_rel else " into workspace root") | |
| + f". Skipped {dirs_skipped} heavy dir(s) (.git, node_modules, etc.)." | |
| ) | |
| return { | |
| "success": True, | |
| "message": msg, | |
| "url": url, | |
| "owner": parsed["owner"], | |
| "repo": parsed["repo"], | |
| "branch": effective_branch or None, | |
| "subdir": effective_subdir or None, | |
| "files_imported": files_imported, | |
| "dirs_skipped": dirs_skipped, | |
| "workspace_path": workspace_rel or ".", | |
| "tree_preview": tree_summary, | |
| } | |
| def list_github_url_examples() -> dict[str, Any]: | |
| """Return example GitHub URL formats accepted by import_github_repo. | |
| Useful for surfacing help in the UI / agent. | |
| """ | |
| return { | |
| "success": True, | |
| "examples": [ | |
| "https://github.com/owner/repo", | |
| "https://github.com/owner/repo.git", | |
| "https://github.com/owner/repo/tree/main", | |
| "https://github.com/owner/repo/tree/main/examples/quickstart", | |
| "git@github.com:owner/repo.git", | |
| ], | |
| "notes": [ | |
| "Only github.com URLs are accepted (HTTPS or SSH form).", | |
| "Shallow clone (depth=1) is used by default to keep imports fast.", | |
| "Directories like .git, node_modules, __pycache__, .venv, dist, build are stripped.", | |
| "If a /tree/<branch>/<subdir> path is included, only that subdir is imported.", | |
| ], | |
| } | |
| # βββ Push workspace to GitHub ββββββββββββββββββββββββββββββββββββββββββββ | |
| def push_to_github( | |
| repo_name: str, | |
| github_token: str, | |
| username: str, | |
| branch: str = "main", | |
| commit_message: str = "", | |
| timeout: int = 120, | |
| ) -> dict[str, Any]: | |
| """Push the current SoniCoder workspace to a GitHub repo. | |
| Workflow: | |
| 1. Snapshot the workspace (call `snapshot_workspace()`). | |
| 2. Create a fresh git repo in a temp dir, copy the files in. | |
| 3. `git commit -m <message>`. | |
| 4. `git push https://<token>@github.com/<username>/<repo>.git <branch>`. | |
| If the remote repo already exists with history, the push uses | |
| `--force-with-lease` so the new commit replaces the remote tip. This | |
| matches the SoniCoder mental model: "the workspace IS the source of | |
| truth; overwrite whatever's on GitHub with my latest". | |
| Parameters | |
| ---------- | |
| repo_name : str | |
| Repo name. Either "repo" (combined with `username` as | |
| `username/repo`) or "username/repo" (username arg then ignored). | |
| github_token : str | |
| GitHub Personal Access Token (PAT) with `repo` scope. | |
| username : str | |
| GitHub username (or org) to push as. Required. | |
| branch : str | |
| Target branch (default "main"). Will be created on push if missing. | |
| commit_message : str | |
| Commit message. Defaults to a timestamped message. | |
| timeout : int | |
| Per-git-command timeout in seconds (default 120). | |
| Returns | |
| ------- | |
| dict with keys: | |
| success, message, repo_full_name, branch, commit_sha, | |
| commit_url, repo_url, files_pushed, error (on failure) | |
| """ | |
| import datetime | |
| import subprocess | |
| # ββ Validate inputs ββββββββββββββββββββββββββββββββββββββββββββββ | |
| repo_name = (repo_name or "").strip() | |
| github_token = (github_token or "").strip() | |
| username = (username or "").strip() | |
| branch = (branch or "main").strip() or "main" | |
| if not commit_message: | |
| commit_message = ( | |
| f"Update from SoniCoder at " | |
| f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" | |
| ) | |
| if not repo_name: | |
| return _gh_err("Repo name is required.") | |
| if not github_token: | |
| return _gh_err("GitHub token is required.") | |
| if not username: | |
| return _gh_err("Username (or org) is required.") | |
| # Normalize repo_name into owner/repo | |
| if "/" in repo_name: | |
| owner, name = repo_name.split("/", 1) | |
| owner = owner.strip() or username | |
| name = name.strip() | |
| else: | |
| owner = username | |
| name = repo_name | |
| if not name: | |
| return _gh_err(f"Invalid repo_name: {repo_name!r}") | |
| repo_full_name = f"{owner}/{name}" | |
| if not _git_available(): | |
| return _gh_err("`git` is not installed in this environment. Cannot push.") | |
| # ββ Snapshot workspace βββββββββββββββββββββββββββββββββββββββββββ | |
| from code.tools.fs import snapshot_workspace | |
| files = snapshot_workspace() | |
| if not files: | |
| return _gh_err( | |
| "Workspace is empty. Generate or import some code first." | |
| ) | |
| # ββ Set up a fresh git repo in temp dir ββββββββββββββββββββββββββ | |
| with tempfile.TemporaryDirectory(prefix="sonicoder_gh_push_") as tmp: | |
| repo_dir = os.path.join(tmp, name) | |
| os.makedirs(repo_dir, exist_ok=True) | |
| def _run_git(args: list[str], cwd: str = repo_dir) -> tuple[int, str, str]: | |
| try: | |
| proc = subprocess.run( | |
| ["git", *args], | |
| cwd=cwd, | |
| capture_output=True, | |
| text=True, | |
| timeout=timeout, | |
| ) | |
| return proc.returncode, proc.stdout, proc.stderr | |
| except subprocess.TimeoutExpired: | |
| return 124, "", f"git {' '.join(args)} timed out after {timeout}s" | |
| # Init repo | |
| rc, _, err = _run_git(["init", "-b", branch]) | |
| if rc != 0: | |
| # Older git doesn't support -b; fall back | |
| rc, _, err = _run_git(["init"]) | |
| if rc != 0: | |
| return _gh_err(f"git init failed: {err}") | |
| # Then checkout/create the branch | |
| _run_git(["checkout", "-b", branch]) | |
| # Set committer identity (required for commit) | |
| _run_git(["config", "user.email", f"{username}@users.noreply.github.com"]) | |
| _run_git(["config", "user.name", username]) | |
| # Write all snapshot files into the repo dir | |
| for rel_path, content in files.items(): | |
| # Safety: skip absolute paths and parent-escape attempts | |
| if os.path.isabs(rel_path) or rel_path.startswith(".."): | |
| continue | |
| target = os.path.join(repo_dir, rel_path) | |
| os.makedirs(os.path.dirname(target) or repo_dir, exist_ok=True) | |
| try: | |
| with open(target, "w", encoding="utf-8") as f: | |
| f.write(content) | |
| except (OSError, UnicodeEncodeError): | |
| continue | |
| # Stage everything | |
| rc, _, err = _run_git(["add", "-A"]) | |
| if rc != 0: | |
| return _gh_err(f"git add failed: {err}") | |
| # Check if there's anything to commit | |
| rc, out, _ = _run_git(["status", "--porcelain"]) | |
| has_changes = bool(out.strip()) | |
| if not has_changes: | |
| return { | |
| "success": True, | |
| "message": "No changes to commit (workspace matches last commit).", | |
| "repo_full_name": repo_full_name, | |
| "branch": branch, | |
| "commit_sha": None, | |
| "commit_url": None, | |
| "repo_url": f"https://github.com/{repo_full_name}", | |
| "files_pushed": 0, | |
| } | |
| # Commit | |
| rc, out, err = _run_git(["commit", "-m", commit_message]) | |
| if rc != 0: | |
| return _gh_err(f"git commit failed: {err}") | |
| # Get commit SHA | |
| rc, sha, _ = _run_git(["rev-parse", "HEAD"]) | |
| commit_sha = sha.strip() | |
| # Build the push URL with token embedded (HTTPS basic auth). | |
| # Token is URL-safe enough for typical PATs (alphanumeric). | |
| push_url = ( | |
| f"https://{username}:{github_token}@github.com/{repo_full_name}.git" | |
| ) | |
| # Push with --force-with-lease so we don't silently overwrite | |
| # someone else's commits if the remote moved. | |
| rc, out, err = _run_git( | |
| ["push", "--force-with-lease", push_url, branch] | |
| ) | |
| if rc != 0: | |
| # If --force-with-lease fails because the remote doesn't exist | |
| # yet (no refs to lease against), retry with a plain push. | |
| if "no matching references" in err.lower() or "delete" in err.lower(): | |
| rc, out, err = _run_git(["push", push_url, branch]) | |
| if rc != 0: | |
| # Don't leak the token in the error | |
| safe_err = err.replace(github_token, "***").replace(push_url, "https://github.com/{repo_full_name}.git") | |
| return _gh_err( | |
| f"git push failed: {safe_err[:400]}. " | |
| "Check that the token has `repo` scope and the repo exists.", | |
| repo_full_name=repo_full_name, | |
| branch=branch, | |
| commit_sha=commit_sha, | |
| ) | |
| # Count files pushed | |
| rc, count_out, _ = _run_git(["ls-files"]) | |
| files_pushed = len([l for l in count_out.splitlines() if l.strip()]) | |
| return { | |
| "success": True, | |
| "message": ( | |
| f"Pushed {files_pushed} file(s) to {repo_full_name} " | |
| f"on branch `{branch}` (commit {commit_sha[:8]})." | |
| ), | |
| "repo_full_name": repo_full_name, | |
| "branch": branch, | |
| "commit_sha": commit_sha, | |
| "commit_url": f"https://github.com/{repo_full_name}/commit/{commit_sha}", | |
| "repo_url": f"https://github.com/{repo_full_name}", | |
| "files_pushed": files_pushed, | |
| } | |
| def _gh_err(message: str, **extras: Any) -> dict[str, Any]: | |
| """Build a standard error response for push_to_github.""" | |
| result: dict[str, Any] = { | |
| "success": False, | |
| "message": message, | |
| "error": message, | |
| } | |
| result.update(extras) | |
| return result | |