sonicoder / code /tools /github.py
R-Kentaren's picture
fix: agent_run param mismatch (send agent_name) + add GitHub push-update (3 inputs: repo name, token, username; --force-with-lease)
b87f702 verified
Raw
History Blame Contribute Delete
21.9 kB
"""GitHub import tool β€” clone a repo into the sandboxed workspace.
This module lets the user (or the agent, via the `/github` slash command)
import an existing GitHub repository into the SoniCoder workspace so the
agent can read, edit, and build on top of real code instead of starting
from scratch.
Security model
--------------
* Only `https://github.com/...` URLs are accepted (no SSH, no file://, no
other hosts). This prevents the clone URL from being used to exfiltrate
data via a malicious git config.
* The repo is cloned into a temp directory first, then *copied* into the
workspace root with `.git/`, `node_modules/`, `__pycache__/`, and other
heavy / unnecessary directories stripped. This:
- avoids polluting the workspace with a `.git` subdir the agent would
otherwise try to walk,
- caps the size of the import (clone-depth 1 + ignored dirs),
- sidesteps git's "destination exists" failure mode.
* Optional `subdir` argument lets the user import only a sub-directory of
the repo (e.g. `examples/quickstart`). This is useful for monorepos.
The function returns a JSON-serializable dict that the frontend / agent
loop can consume directly.
"""
from __future__ import annotations
import os
import re
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Any
from code.tools.fs import get_workspace_root
# ─── URL validation ──────────────────────────────────────────────────────
# Accept:
# https://github.com/owner/repo
# https://github.com/owner/repo.git
# https://github.com/owner/repo/tree/main
# https://github.com/owner/repo/tree/main/subdir
# https://github.com/owner/repo/branches/main/subdir (older form)
# git@github.com:owner/repo.git β†’ rewritten to https
_GITHUB_HTTPS = re.compile(
r"^https://github\.com/(?P<owner>[^/\s]+)/(?P<repo>[^/\s]+?)(?:\.git)?(?:/(?:tree|blob|branches)/([^/\s]+)(?:/(.*))?)?/?$",
re.IGNORECASE,
)
_GITHUB_SSH = re.compile(
r"^git@github\.com:(?P<owner>[^/\s]+)/(?P<repo>[^/\s]+?)(?:\.git)?/?$",
re.IGNORECASE,
)
# Directories that are usually huge / not useful in the agent workspace
_STRIP_DIRS = {
".git",
".hg",
".svn",
"node_modules",
"__pycache__",
".venv",
"venv",
"env",
".tox",
".mypy_cache",
".pytest_cache",
".ruff_cache",
"dist",
"build",
".next",
".nuxt",
".cache",
".gradle",
"target",
"Pods",
}
# Files we never want to land in the workspace
_STRIP_FILES = {
".DS_Store",
"Thumbs.db",
}
def _parse_github_url(url: str) -> dict[str, str | None]:
"""Parse a GitHub URL into owner, repo, branch, subdir.
Returns a dict with keys: owner, repo, branch, subdir, clone_url.
Raises ValueError if the URL is not a valid GitHub URL.
"""
url = (url or "").strip()
if not url:
raise ValueError("Empty GitHub URL")
m = _GITHUB_HTTPS.match(url)
if m:
owner = m.group("owner")
repo = m.group("repo")
branch = m.group(3)
subdir = m.group(4) or ""
clone_url = f"https://github.com/{owner}/{repo}.git"
return {
"owner": owner,
"repo": repo,
"branch": branch,
"subdir": subdir.strip("/"),
"clone_url": clone_url,
}
m = _GITHUB_SSH.match(url)
if m:
owner = m.group("owner")
repo = m.group("repo")
clone_url = f"https://github.com/{owner}/{repo}.git"
return {
"owner": owner,
"repo": repo,
"branch": None,
"subdir": "",
"clone_url": clone_url,
}
raise ValueError(
f"Invalid GitHub URL: {url!r}. "
"Expected https://github.com/<owner>/<repo> or git@github.com:<owner>/<repo>.git"
)
def _git_available() -> bool:
"""Check whether the `git` binary is on PATH."""
try:
result = subprocess.run(
["git", "--version"],
capture_output=True,
timeout=5,
)
return result.returncode == 0
except (FileNotFoundError, subprocess.TimeoutExpired):
return False
def _safe_relpath(root: str, full: str) -> str:
"""Return path relative to root, or empty if equal to root."""
return os.path.relpath(full, root)
def _copy_filtered(src: str, dst: str) -> tuple[int, int]:
"""Copy `src` (a directory) into `dst`, skipping _STRIP_DIRS/_STRIP_FILES.
Returns (files_copied, dirs_skipped).
"""
files_copied = 0
dirs_skipped = 0
for dirpath, dirnames, fnames in os.walk(src, topdown=True):
# In-place mutate dirnames so os.walk skips them
keep_dirs = []
for d in dirnames:
if d in _STRIP_DIRS:
dirs_skipped += 1
else:
keep_dirs.append(d)
dirnames[:] = keep_dirs
rel = _safe_relpath(src, dirpath)
target_dir = dst if rel == "." else os.path.join(dst, rel)
os.makedirs(target_dir, exist_ok=True)
for fname in fnames:
if fname in _STRIP_FILES:
continue
try:
shutil.copy2(os.path.join(dirpath, fname), os.path.join(target_dir, fname))
files_copied += 1
except (OSError, shutil.SpecialFileError):
# Skip files we can't copy (sockets, fifos, etc.)
continue
return files_copied, dirs_skipped
# ─── Public API ──────────────────────────────────────────────────────────
def import_github_repo(
url: str,
branch: str = "",
subdir: str = "",
target_subdir: str = "",
depth: int = 1,
timeout: int = 120,
) -> dict[str, Any]:
"""Clone a GitHub repo into the workspace.
Args:
url: GitHub URL (https or SSH form). May include /tree/<branch>/<subdir>.
branch: Optional branch/tag to checkout. If empty, uses the URL's
embedded branch (if any) or the repo's default branch.
subdir: Optional sub-directory inside the repo to import. If the URL
already includes /tree/<branch>/<subdir>, this is added on
top (rare).
target_subdir: Where inside the workspace to place the import. If
empty, places at workspace root. If non-empty and the
directory exists, it will be overwritten.
depth: Git clone depth. Default 1 (shallow).
timeout: Git clone timeout in seconds.
Returns:
dict with keys:
success: bool
message: str
url: original URL
owner, repo, branch, subdir: parsed
files_imported: int
dirs_skipped: int
workspace_path: relative path inside the workspace where files landed
tree: dict (workspace tree summary, optional)
error: str (on failure)
"""
try:
parsed = _parse_github_url(url)
except ValueError as exc:
return {
"success": False,
"message": str(exc),
"url": url,
"error": str(exc),
}
# Resolve effective branch / subdir
effective_branch = branch.strip() or parsed.get("branch") or ""
effective_subdir = subdir.strip() or parsed.get("subdir") or ""
if not _git_available():
return {
"success": False,
"message": "`git` is not installed in this environment. Cannot clone.",
"url": url,
"error": "git binary not found",
}
workspace_root = get_workspace_root()
os.makedirs(workspace_root, exist_ok=True)
# Decide destination inside workspace
if target_subdir.strip():
# Sanitize target_subdir β€” no path escapes
target_rel = target_subdir.strip().strip("/").lstrip(".")
if not target_rel or target_rel.startswith("/"):
return {
"success": False,
"message": f"Invalid target_subdir: {target_subdir!r}",
"url": url,
"error": "invalid target_subdir",
}
dest_root = os.path.join(workspace_root, target_rel)
workspace_rel = target_rel
else:
dest_root = workspace_root
workspace_rel = ""
# If dest_root already has content, clear it first so the import is clean.
# (We never touch anything outside dest_root.)
if os.path.isdir(dest_root):
for entry in os.listdir(dest_root):
full = os.path.join(dest_root, entry)
try:
if os.path.isdir(full):
shutil.rmtree(full)
else:
os.remove(full)
except OSError as exc:
# Don't fail the whole import over a single file
pass
# Clone into a temp dir
with tempfile.TemporaryDirectory(prefix="sonicoder_gh_") as tmp:
clone_target = os.path.join(tmp, parsed["repo"])
cmd = [
"git",
"clone",
"--depth",
str(int(depth)),
"--single-branch",
]
if effective_branch:
cmd.extend(["--branch", effective_branch])
cmd.extend([parsed["clone_url"], clone_target])
try:
proc = subprocess.run(
cmd,
capture_output=True,
timeout=timeout,
text=True,
)
except subprocess.TimeoutExpired:
return {
"success": False,
"message": f"Git clone timed out after {timeout}s",
"url": url,
"error": "clone_timeout",
}
if proc.returncode != 0:
stderr = (proc.stderr or "").strip()
return {
"success": False,
"message": f"git clone failed: {stderr[:500]}",
"url": url,
"error": stderr[:500],
}
if not os.path.isdir(clone_target):
return {
"success": False,
"message": "git clone appeared to succeed but the target directory is missing",
"url": url,
"error": "clone_dir_missing",
}
# If subdir is requested, validate it
src_root = clone_target
if effective_subdir:
candidate = os.path.join(clone_target, effective_subdir)
if not os.path.isdir(candidate):
return {
"success": False,
"message": (
f"Subdirectory '{effective_subdir}' not found in repo "
f"{parsed['owner']}/{parsed['repo']}"
),
"url": url,
"error": "subdir_not_found",
}
src_root = candidate
# Copy filtered files into dest_root
files_imported, dirs_skipped = _copy_filtered(src_root, dest_root)
# Build a brief tree summary (top 2 levels) for the response
tree_summary: list[str] = []
try:
for name in sorted(os.listdir(dest_root))[:30]:
full = os.path.join(dest_root, name)
tree_summary.append(f"{name}/" if os.path.isdir(full) else name)
except OSError:
pass
msg = (
f"Imported {files_imported} file(s) from {parsed['owner']}/{parsed['repo']}"
+ (f" (branch: {effective_branch})" if effective_branch else "")
+ (f" subdir: {effective_subdir}" if effective_subdir else "")
+ (f" into {workspace_rel}/" if workspace_rel else " into workspace root")
+ f". Skipped {dirs_skipped} heavy dir(s) (.git, node_modules, etc.)."
)
return {
"success": True,
"message": msg,
"url": url,
"owner": parsed["owner"],
"repo": parsed["repo"],
"branch": effective_branch or None,
"subdir": effective_subdir or None,
"files_imported": files_imported,
"dirs_skipped": dirs_skipped,
"workspace_path": workspace_rel or ".",
"tree_preview": tree_summary,
}
def list_github_url_examples() -> dict[str, Any]:
"""Return example GitHub URL formats accepted by import_github_repo.
Useful for surfacing help in the UI / agent.
"""
return {
"success": True,
"examples": [
"https://github.com/owner/repo",
"https://github.com/owner/repo.git",
"https://github.com/owner/repo/tree/main",
"https://github.com/owner/repo/tree/main/examples/quickstart",
"git@github.com:owner/repo.git",
],
"notes": [
"Only github.com URLs are accepted (HTTPS or SSH form).",
"Shallow clone (depth=1) is used by default to keep imports fast.",
"Directories like .git, node_modules, __pycache__, .venv, dist, build are stripped.",
"If a /tree/<branch>/<subdir> path is included, only that subdir is imported.",
],
}
# ─── Push workspace to GitHub ────────────────────────────────────────────
def push_to_github(
repo_name: str,
github_token: str,
username: str,
branch: str = "main",
commit_message: str = "",
timeout: int = 120,
) -> dict[str, Any]:
"""Push the current SoniCoder workspace to a GitHub repo.
Workflow:
1. Snapshot the workspace (call `snapshot_workspace()`).
2. Create a fresh git repo in a temp dir, copy the files in.
3. `git commit -m <message>`.
4. `git push https://<token>@github.com/<username>/<repo>.git <branch>`.
If the remote repo already exists with history, the push uses
`--force-with-lease` so the new commit replaces the remote tip. This
matches the SoniCoder mental model: "the workspace IS the source of
truth; overwrite whatever's on GitHub with my latest".
Parameters
----------
repo_name : str
Repo name. Either "repo" (combined with `username` as
`username/repo`) or "username/repo" (username arg then ignored).
github_token : str
GitHub Personal Access Token (PAT) with `repo` scope.
username : str
GitHub username (or org) to push as. Required.
branch : str
Target branch (default "main"). Will be created on push if missing.
commit_message : str
Commit message. Defaults to a timestamped message.
timeout : int
Per-git-command timeout in seconds (default 120).
Returns
-------
dict with keys:
success, message, repo_full_name, branch, commit_sha,
commit_url, repo_url, files_pushed, error (on failure)
"""
import datetime
import subprocess
# ── Validate inputs ──────────────────────────────────────────────
repo_name = (repo_name or "").strip()
github_token = (github_token or "").strip()
username = (username or "").strip()
branch = (branch or "main").strip() or "main"
if not commit_message:
commit_message = (
f"Update from SoniCoder at "
f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
if not repo_name:
return _gh_err("Repo name is required.")
if not github_token:
return _gh_err("GitHub token is required.")
if not username:
return _gh_err("Username (or org) is required.")
# Normalize repo_name into owner/repo
if "/" in repo_name:
owner, name = repo_name.split("/", 1)
owner = owner.strip() or username
name = name.strip()
else:
owner = username
name = repo_name
if not name:
return _gh_err(f"Invalid repo_name: {repo_name!r}")
repo_full_name = f"{owner}/{name}"
if not _git_available():
return _gh_err("`git` is not installed in this environment. Cannot push.")
# ── Snapshot workspace ───────────────────────────────────────────
from code.tools.fs import snapshot_workspace
files = snapshot_workspace()
if not files:
return _gh_err(
"Workspace is empty. Generate or import some code first."
)
# ── Set up a fresh git repo in temp dir ──────────────────────────
with tempfile.TemporaryDirectory(prefix="sonicoder_gh_push_") as tmp:
repo_dir = os.path.join(tmp, name)
os.makedirs(repo_dir, exist_ok=True)
def _run_git(args: list[str], cwd: str = repo_dir) -> tuple[int, str, str]:
try:
proc = subprocess.run(
["git", *args],
cwd=cwd,
capture_output=True,
text=True,
timeout=timeout,
)
return proc.returncode, proc.stdout, proc.stderr
except subprocess.TimeoutExpired:
return 124, "", f"git {' '.join(args)} timed out after {timeout}s"
# Init repo
rc, _, err = _run_git(["init", "-b", branch])
if rc != 0:
# Older git doesn't support -b; fall back
rc, _, err = _run_git(["init"])
if rc != 0:
return _gh_err(f"git init failed: {err}")
# Then checkout/create the branch
_run_git(["checkout", "-b", branch])
# Set committer identity (required for commit)
_run_git(["config", "user.email", f"{username}@users.noreply.github.com"])
_run_git(["config", "user.name", username])
# Write all snapshot files into the repo dir
for rel_path, content in files.items():
# Safety: skip absolute paths and parent-escape attempts
if os.path.isabs(rel_path) or rel_path.startswith(".."):
continue
target = os.path.join(repo_dir, rel_path)
os.makedirs(os.path.dirname(target) or repo_dir, exist_ok=True)
try:
with open(target, "w", encoding="utf-8") as f:
f.write(content)
except (OSError, UnicodeEncodeError):
continue
# Stage everything
rc, _, err = _run_git(["add", "-A"])
if rc != 0:
return _gh_err(f"git add failed: {err}")
# Check if there's anything to commit
rc, out, _ = _run_git(["status", "--porcelain"])
has_changes = bool(out.strip())
if not has_changes:
return {
"success": True,
"message": "No changes to commit (workspace matches last commit).",
"repo_full_name": repo_full_name,
"branch": branch,
"commit_sha": None,
"commit_url": None,
"repo_url": f"https://github.com/{repo_full_name}",
"files_pushed": 0,
}
# Commit
rc, out, err = _run_git(["commit", "-m", commit_message])
if rc != 0:
return _gh_err(f"git commit failed: {err}")
# Get commit SHA
rc, sha, _ = _run_git(["rev-parse", "HEAD"])
commit_sha = sha.strip()
# Build the push URL with token embedded (HTTPS basic auth).
# Token is URL-safe enough for typical PATs (alphanumeric).
push_url = (
f"https://{username}:{github_token}@github.com/{repo_full_name}.git"
)
# Push with --force-with-lease so we don't silently overwrite
# someone else's commits if the remote moved.
rc, out, err = _run_git(
["push", "--force-with-lease", push_url, branch]
)
if rc != 0:
# If --force-with-lease fails because the remote doesn't exist
# yet (no refs to lease against), retry with a plain push.
if "no matching references" in err.lower() or "delete" in err.lower():
rc, out, err = _run_git(["push", push_url, branch])
if rc != 0:
# Don't leak the token in the error
safe_err = err.replace(github_token, "***").replace(push_url, "https://github.com/{repo_full_name}.git")
return _gh_err(
f"git push failed: {safe_err[:400]}. "
"Check that the token has `repo` scope and the repo exists.",
repo_full_name=repo_full_name,
branch=branch,
commit_sha=commit_sha,
)
# Count files pushed
rc, count_out, _ = _run_git(["ls-files"])
files_pushed = len([l for l in count_out.splitlines() if l.strip()])
return {
"success": True,
"message": (
f"Pushed {files_pushed} file(s) to {repo_full_name} "
f"on branch `{branch}` (commit {commit_sha[:8]})."
),
"repo_full_name": repo_full_name,
"branch": branch,
"commit_sha": commit_sha,
"commit_url": f"https://github.com/{repo_full_name}/commit/{commit_sha}",
"repo_url": f"https://github.com/{repo_full_name}",
"files_pushed": files_pushed,
}
def _gh_err(message: str, **extras: Any) -> dict[str, Any]:
"""Build a standard error response for push_to_github."""
result: dict[str, Any] = {
"success": False,
"message": message,
"error": message,
}
result.update(extras)
return result