"""GitHub import tool — clone a repo into the sandboxed workspace. This module lets the user (or the agent, via the `/github` slash command) import an existing GitHub repository into the SoniCoder workspace so the agent can read, edit, and build on top of real code instead of starting from scratch. Security model -------------- * Only `https://github.com/...` URLs are accepted (no SSH, no file://, no other hosts). This prevents the clone URL from being used to exfiltrate data via a malicious git config. * The repo is cloned into a temp directory first, then *copied* into the workspace root with `.git/`, `node_modules/`, `__pycache__/`, and other heavy / unnecessary directories stripped. This: - avoids polluting the workspace with a `.git` subdir the agent would otherwise try to walk, - caps the size of the import (clone-depth 1 + ignored dirs), - sidesteps git's "destination exists" failure mode. * Optional `subdir` argument lets the user import only a sub-directory of the repo (e.g. `examples/quickstart`). This is useful for monorepos. The function returns a JSON-serializable dict that the frontend / agent loop can consume directly. """ from __future__ import annotations import os import re import shutil import subprocess import tempfile from pathlib import Path from typing import Any from code.tools.fs import get_workspace_root # ─── URL validation ────────────────────────────────────────────────────── # Accept: # https://github.com/owner/repo # https://github.com/owner/repo.git # https://github.com/owner/repo/tree/main # https://github.com/owner/repo/tree/main/subdir # https://github.com/owner/repo/branches/main/subdir (older form) # git@github.com:owner/repo.git → rewritten to https _GITHUB_HTTPS = re.compile( r"^https://github\.com/(?P[^/\s]+)/(?P[^/\s]+?)(?:\.git)?(?:/(?:tree|blob|branches)/([^/\s]+)(?:/(.*))?)?/?$", re.IGNORECASE, ) _GITHUB_SSH = re.compile( r"^git@github\.com:(?P[^/\s]+)/(?P[^/\s]+?)(?:\.git)?/?$", re.IGNORECASE, ) # Directories that are usually huge / not useful in the agent workspace _STRIP_DIRS = { ".git", ".hg", ".svn", "node_modules", "__pycache__", ".venv", "venv", "env", ".tox", ".mypy_cache", ".pytest_cache", ".ruff_cache", "dist", "build", ".next", ".nuxt", ".cache", ".gradle", "target", "Pods", } # Files we never want to land in the workspace _STRIP_FILES = { ".DS_Store", "Thumbs.db", } def _parse_github_url(url: str) -> dict[str, str | None]: """Parse a GitHub URL into owner, repo, branch, subdir. Returns a dict with keys: owner, repo, branch, subdir, clone_url. Raises ValueError if the URL is not a valid GitHub URL. """ url = (url or "").strip() if not url: raise ValueError("Empty GitHub URL") m = _GITHUB_HTTPS.match(url) if m: owner = m.group("owner") repo = m.group("repo") branch = m.group(3) subdir = m.group(4) or "" clone_url = f"https://github.com/{owner}/{repo}.git" return { "owner": owner, "repo": repo, "branch": branch, "subdir": subdir.strip("/"), "clone_url": clone_url, } m = _GITHUB_SSH.match(url) if m: owner = m.group("owner") repo = m.group("repo") clone_url = f"https://github.com/{owner}/{repo}.git" return { "owner": owner, "repo": repo, "branch": None, "subdir": "", "clone_url": clone_url, } raise ValueError( f"Invalid GitHub URL: {url!r}. " "Expected https://github.com// or git@github.com:/.git" ) def _git_available() -> bool: """Check whether the `git` binary is on PATH.""" try: result = subprocess.run( ["git", "--version"], capture_output=True, timeout=5, ) return result.returncode == 0 except (FileNotFoundError, subprocess.TimeoutExpired): return False def _safe_relpath(root: str, full: str) -> str: """Return path relative to root, or empty if equal to root.""" return os.path.relpath(full, root) def _copy_filtered(src: str, dst: str) -> tuple[int, int]: """Copy `src` (a directory) into `dst`, skipping _STRIP_DIRS/_STRIP_FILES. Returns (files_copied, dirs_skipped). """ files_copied = 0 dirs_skipped = 0 for dirpath, dirnames, fnames in os.walk(src, topdown=True): # In-place mutate dirnames so os.walk skips them keep_dirs = [] for d in dirnames: if d in _STRIP_DIRS: dirs_skipped += 1 else: keep_dirs.append(d) dirnames[:] = keep_dirs rel = _safe_relpath(src, dirpath) target_dir = dst if rel == "." else os.path.join(dst, rel) os.makedirs(target_dir, exist_ok=True) for fname in fnames: if fname in _STRIP_FILES: continue try: shutil.copy2(os.path.join(dirpath, fname), os.path.join(target_dir, fname)) files_copied += 1 except (OSError, shutil.SpecialFileError): # Skip files we can't copy (sockets, fifos, etc.) continue return files_copied, dirs_skipped # ─── Public API ────────────────────────────────────────────────────────── def import_github_repo( url: str, branch: str = "", subdir: str = "", target_subdir: str = "", depth: int = 1, timeout: int = 120, ) -> dict[str, Any]: """Clone a GitHub repo into the workspace. Args: url: GitHub URL (https or SSH form). May include /tree//. branch: Optional branch/tag to checkout. If empty, uses the URL's embedded branch (if any) or the repo's default branch. subdir: Optional sub-directory inside the repo to import. If the URL already includes /tree//, this is added on top (rare). target_subdir: Where inside the workspace to place the import. If empty, places at workspace root. If non-empty and the directory exists, it will be overwritten. depth: Git clone depth. Default 1 (shallow). timeout: Git clone timeout in seconds. Returns: dict with keys: success: bool message: str url: original URL owner, repo, branch, subdir: parsed files_imported: int dirs_skipped: int workspace_path: relative path inside the workspace where files landed tree: dict (workspace tree summary, optional) error: str (on failure) """ try: parsed = _parse_github_url(url) except ValueError as exc: return { "success": False, "message": str(exc), "url": url, "error": str(exc), } # Resolve effective branch / subdir effective_branch = branch.strip() or parsed.get("branch") or "" effective_subdir = subdir.strip() or parsed.get("subdir") or "" if not _git_available(): return { "success": False, "message": "`git` is not installed in this environment. Cannot clone.", "url": url, "error": "git binary not found", } workspace_root = get_workspace_root() os.makedirs(workspace_root, exist_ok=True) # Decide destination inside workspace if target_subdir.strip(): # Sanitize target_subdir — no path escapes target_rel = target_subdir.strip().strip("/").lstrip(".") if not target_rel or target_rel.startswith("/"): return { "success": False, "message": f"Invalid target_subdir: {target_subdir!r}", "url": url, "error": "invalid target_subdir", } dest_root = os.path.join(workspace_root, target_rel) workspace_rel = target_rel else: dest_root = workspace_root workspace_rel = "" # If dest_root already has content, clear it first so the import is clean. # (We never touch anything outside dest_root.) if os.path.isdir(dest_root): for entry in os.listdir(dest_root): full = os.path.join(dest_root, entry) try: if os.path.isdir(full): shutil.rmtree(full) else: os.remove(full) except OSError as exc: # Don't fail the whole import over a single file pass # Clone into a temp dir with tempfile.TemporaryDirectory(prefix="sonicoder_gh_") as tmp: clone_target = os.path.join(tmp, parsed["repo"]) cmd = [ "git", "clone", "--depth", str(int(depth)), "--single-branch", ] if effective_branch: cmd.extend(["--branch", effective_branch]) cmd.extend([parsed["clone_url"], clone_target]) try: proc = subprocess.run( cmd, capture_output=True, timeout=timeout, text=True, ) except subprocess.TimeoutExpired: return { "success": False, "message": f"Git clone timed out after {timeout}s", "url": url, "error": "clone_timeout", } if proc.returncode != 0: stderr = (proc.stderr or "").strip() return { "success": False, "message": f"git clone failed: {stderr[:500]}", "url": url, "error": stderr[:500], } if not os.path.isdir(clone_target): return { "success": False, "message": "git clone appeared to succeed but the target directory is missing", "url": url, "error": "clone_dir_missing", } # If subdir is requested, validate it src_root = clone_target if effective_subdir: candidate = os.path.join(clone_target, effective_subdir) if not os.path.isdir(candidate): return { "success": False, "message": ( f"Subdirectory '{effective_subdir}' not found in repo " f"{parsed['owner']}/{parsed['repo']}" ), "url": url, "error": "subdir_not_found", } src_root = candidate # Copy filtered files into dest_root files_imported, dirs_skipped = _copy_filtered(src_root, dest_root) # Build a brief tree summary (top 2 levels) for the response tree_summary: list[str] = [] try: for name in sorted(os.listdir(dest_root))[:30]: full = os.path.join(dest_root, name) tree_summary.append(f"{name}/" if os.path.isdir(full) else name) except OSError: pass msg = ( f"Imported {files_imported} file(s) from {parsed['owner']}/{parsed['repo']}" + (f" (branch: {effective_branch})" if effective_branch else "") + (f" subdir: {effective_subdir}" if effective_subdir else "") + (f" into {workspace_rel}/" if workspace_rel else " into workspace root") + f". Skipped {dirs_skipped} heavy dir(s) (.git, node_modules, etc.)." ) return { "success": True, "message": msg, "url": url, "owner": parsed["owner"], "repo": parsed["repo"], "branch": effective_branch or None, "subdir": effective_subdir or None, "files_imported": files_imported, "dirs_skipped": dirs_skipped, "workspace_path": workspace_rel or ".", "tree_preview": tree_summary, } def list_github_url_examples() -> dict[str, Any]: """Return example GitHub URL formats accepted by import_github_repo. Useful for surfacing help in the UI / agent. """ return { "success": True, "examples": [ "https://github.com/owner/repo", "https://github.com/owner/repo.git", "https://github.com/owner/repo/tree/main", "https://github.com/owner/repo/tree/main/examples/quickstart", "git@github.com:owner/repo.git", ], "notes": [ "Only github.com URLs are accepted (HTTPS or SSH form).", "Shallow clone (depth=1) is used by default to keep imports fast.", "Directories like .git, node_modules, __pycache__, .venv, dist, build are stripped.", "If a /tree// path is included, only that subdir is imported.", ], } # ─── Push workspace to GitHub ──────────────────────────────────────────── def push_to_github( repo_name: str, github_token: str, username: str, branch: str = "main", commit_message: str = "", timeout: int = 120, ) -> dict[str, Any]: """Push the current SoniCoder workspace to a GitHub repo. Workflow: 1. Snapshot the workspace (call `snapshot_workspace()`). 2. Create a fresh git repo in a temp dir, copy the files in. 3. `git commit -m `. 4. `git push https://@github.com//.git `. If the remote repo already exists with history, the push uses `--force-with-lease` so the new commit replaces the remote tip. This matches the SoniCoder mental model: "the workspace IS the source of truth; overwrite whatever's on GitHub with my latest". Parameters ---------- repo_name : str Repo name. Either "repo" (combined with `username` as `username/repo`) or "username/repo" (username arg then ignored). github_token : str GitHub Personal Access Token (PAT) with `repo` scope. username : str GitHub username (or org) to push as. Required. branch : str Target branch (default "main"). Will be created on push if missing. commit_message : str Commit message. Defaults to a timestamped message. timeout : int Per-git-command timeout in seconds (default 120). Returns ------- dict with keys: success, message, repo_full_name, branch, commit_sha, commit_url, repo_url, files_pushed, error (on failure) """ import datetime import subprocess # ── Validate inputs ────────────────────────────────────────────── repo_name = (repo_name or "").strip() github_token = (github_token or "").strip() username = (username or "").strip() branch = (branch or "main").strip() or "main" if not commit_message: commit_message = ( f"Update from SoniCoder at " f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" ) if not repo_name: return _gh_err("Repo name is required.") if not github_token: return _gh_err("GitHub token is required.") if not username: return _gh_err("Username (or org) is required.") # Normalize repo_name into owner/repo if "/" in repo_name: owner, name = repo_name.split("/", 1) owner = owner.strip() or username name = name.strip() else: owner = username name = repo_name if not name: return _gh_err(f"Invalid repo_name: {repo_name!r}") repo_full_name = f"{owner}/{name}" if not _git_available(): return _gh_err("`git` is not installed in this environment. Cannot push.") # ── Snapshot workspace ─────────────────────────────────────────── from code.tools.fs import snapshot_workspace files = snapshot_workspace() if not files: return _gh_err( "Workspace is empty. Generate or import some code first." ) # ── Set up a fresh git repo in temp dir ────────────────────────── with tempfile.TemporaryDirectory(prefix="sonicoder_gh_push_") as tmp: repo_dir = os.path.join(tmp, name) os.makedirs(repo_dir, exist_ok=True) def _run_git(args: list[str], cwd: str = repo_dir) -> tuple[int, str, str]: try: proc = subprocess.run( ["git", *args], cwd=cwd, capture_output=True, text=True, timeout=timeout, ) return proc.returncode, proc.stdout, proc.stderr except subprocess.TimeoutExpired: return 124, "", f"git {' '.join(args)} timed out after {timeout}s" # Init repo rc, _, err = _run_git(["init", "-b", branch]) if rc != 0: # Older git doesn't support -b; fall back rc, _, err = _run_git(["init"]) if rc != 0: return _gh_err(f"git init failed: {err}") # Then checkout/create the branch _run_git(["checkout", "-b", branch]) # Set committer identity (required for commit) _run_git(["config", "user.email", f"{username}@users.noreply.github.com"]) _run_git(["config", "user.name", username]) # Write all snapshot files into the repo dir for rel_path, content in files.items(): # Safety: skip absolute paths and parent-escape attempts if os.path.isabs(rel_path) or rel_path.startswith(".."): continue target = os.path.join(repo_dir, rel_path) os.makedirs(os.path.dirname(target) or repo_dir, exist_ok=True) try: with open(target, "w", encoding="utf-8") as f: f.write(content) except (OSError, UnicodeEncodeError): continue # Stage everything rc, _, err = _run_git(["add", "-A"]) if rc != 0: return _gh_err(f"git add failed: {err}") # Check if there's anything to commit rc, out, _ = _run_git(["status", "--porcelain"]) has_changes = bool(out.strip()) if not has_changes: return { "success": True, "message": "No changes to commit (workspace matches last commit).", "repo_full_name": repo_full_name, "branch": branch, "commit_sha": None, "commit_url": None, "repo_url": f"https://github.com/{repo_full_name}", "files_pushed": 0, } # Commit rc, out, err = _run_git(["commit", "-m", commit_message]) if rc != 0: return _gh_err(f"git commit failed: {err}") # Get commit SHA rc, sha, _ = _run_git(["rev-parse", "HEAD"]) commit_sha = sha.strip() # Build the push URL with token embedded (HTTPS basic auth). # Token is URL-safe enough for typical PATs (alphanumeric). push_url = ( f"https://{username}:{github_token}@github.com/{repo_full_name}.git" ) # Push with --force-with-lease so we don't silently overwrite # someone else's commits if the remote moved. rc, out, err = _run_git( ["push", "--force-with-lease", push_url, branch] ) if rc != 0: # If --force-with-lease fails because the remote doesn't exist # yet (no refs to lease against), retry with a plain push. if "no matching references" in err.lower() or "delete" in err.lower(): rc, out, err = _run_git(["push", push_url, branch]) if rc != 0: # Don't leak the token in the error safe_err = err.replace(github_token, "***").replace(push_url, "https://github.com/{repo_full_name}.git") return _gh_err( f"git push failed: {safe_err[:400]}. " "Check that the token has `repo` scope and the repo exists.", repo_full_name=repo_full_name, branch=branch, commit_sha=commit_sha, ) # Count files pushed rc, count_out, _ = _run_git(["ls-files"]) files_pushed = len([l for l in count_out.splitlines() if l.strip()]) return { "success": True, "message": ( f"Pushed {files_pushed} file(s) to {repo_full_name} " f"on branch `{branch}` (commit {commit_sha[:8]})." ), "repo_full_name": repo_full_name, "branch": branch, "commit_sha": commit_sha, "commit_url": f"https://github.com/{repo_full_name}/commit/{commit_sha}", "repo_url": f"https://github.com/{repo_full_name}", "files_pushed": files_pushed, } def _gh_err(message: str, **extras: Any) -> dict[str, Any]: """Build a standard error response for push_to_github.""" result: dict[str, Any] = { "success": False, "message": message, "error": message, } result.update(extras) return result