sonicoder / code /server /routes.py
R-Kentaren's picture
fix(oauth): use gr.OAuthToken for hf_auth + remove hf_oauth from README to resolve secret collision and Gradio type-hint ValueError
82b44e2 verified
Raw
History Blame Contribute Delete
43.2 kB
"""FastAPI / Gradio Server routes.
Defines all HTTP and API endpoints:
- GET / β†’ serves the index.html frontend
- GET /api/model-status β†’ model loading status
- GET /images/{f} β†’ serve generated plot images
- GET /download/{f} β†’ serve project ZIP downloads
- API web_search β†’ Google search scraping
- API chat β†’ streaming chat with code execution
- API push_hf β†’ push to HuggingFace Hub
- API switch_model β†’ switch between loaded models
- API upload_image β†’ upload image for VLM inference
- API hf_auth β†’ get HF OAuth profile & organizations
- API agent_run β†’ Claude Code-style agent loop with tools
- API list_skills β†’ list available skills
- API list_commands→ list available slash commands
- API list_hooks β†’ list configured hooks
- API workspace_tree→ list workspace files
- API workspace_read→ read a workspace file
- API workspace_write→ write a workspace file
- API workspace_bash→ run a bash command in workspace
- API todo_read β†’ read current todo list
- API todo_write β†’ update todo list
- API import_github β†’ clone a GitHub repo into the workspace
- API github_url_examples β†’ return accepted GitHub URL formats
- API push_github β†’ push the current workspace to a GitHub repo
"""
from __future__ import annotations
import base64
import json
import logging
import os
import tempfile
from pathlib import Path
from typing import Any, Optional
import gradio as gr
from fastapi.responses import HTMLResponse, FileResponse
try:
from gradio import Server
except ImportError:
# Fallback for older/newer Gradio versions where Server may not be exposed
# at the top level. We provide a minimal shim so the module can still be
# imported for testing purposes.
class Server: # type: ignore
"""Minimal shim for Gradio Server when not available."""
def __init__(self, *args, **kwargs):
from fastapi import FastAPI
self._fastapi = FastAPI()
def get(self, path: str, **kwargs):
return self._fastapi.get(path, **kwargs)
def api(self, name: str = None, concurrency_limit: int = 1):
def decorator(fn):
# Store as attribute so it can be inspected
fn._api_name = name
fn._concurrency_limit = concurrency_limit
return fn
return decorator
from code.config.constants import (
APP_TITLE,
DEFAULT_MODEL_KEY,
EXAMPLE_PROMPTS,
LANGUAGE_OPTIONS,
MODEL_CONFIGS,
MODEL_URL,
PY_TIMEOUT_S,
)
from code.execution.code_extractor import (
build_iframe,
extract_code,
extract_multi_file,
is_gradio_code,
normalize_language,
strip_thinking_blocks,
)
from code.execution.gradio_runner import run_gradio_app, stop_gradio_app
from code.execution.python_runner import run_python
from code.huggingface.push import create_project_zip, push_to_huggingface
from code.model.loader import (
get_model_status,
is_model_loaded,
get_current_model_key,
get_current_model_type,
switch_model,
)
from code.model.inference import call_model
from code.server.chat_helpers import chat_history_to_messages, targeted_prompt
from code.websearch.google_scraper import web_search_google, format_search_results
logger = logging.getLogger(__name__)
# ─── Served Files Registry ──────────────────────────────────────────────
_served_files: dict[str, str] = {}
# ─── Uploaded Images Registry ───────────────────────────────────────────
_uploaded_images: dict[str, str] = {}
# ─── Server Instance ────────────────────────────────────────────────────
app = Server()
# ─── HTTP Routes ────────────────────────────────────────────────────────
@app.get("/", response_class=HTMLResponse)
async def homepage():
"""Serve the index.html frontend with runtime config injected."""
html_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "index.html")
with open(html_path, "r", encoding="utf-8") as f:
content = f.read()
# Load skills, commands, hooks for the frontend
try:
from code.skills import list_skills
skills_list = list_skills()
except Exception:
skills_list = []
try:
from code.commands import list_commands
commands_list = list_commands()
except Exception:
commands_list = []
try:
from code.hooks import list_hooks
hooks_list = list_hooks()
except Exception:
hooks_list = []
try:
from code.agents import list_agents, get_active_agent
agents_list = list_agents()
active_agent = get_active_agent()
except Exception:
agents_list = []
active_agent = None
config = json.dumps({
"app_title": APP_TITLE,
"model_id": MODEL_CONFIGS[DEFAULT_MODEL_KEY]["id"],
"model_configs": {k: {"name": v["name"], "type": v["type"], "description": v["description"]} for k, v in MODEL_CONFIGS.items()},
"model_url": MODEL_URL,
"languages": LANGUAGE_OPTIONS,
"examples": [
{"label": label, "prompt": prompt, "language": lang, "framework": fw}
for label, prompt, lang, fw in EXAMPLE_PROMPTS
],
"default_model": "minicpm5-1b",
"skills": skills_list,
"commands": commands_list,
"hooks": hooks_list,
"agents": agents_list,
"active_agent": active_agent,
})
content = content.replace("__RUNTIME_CONFIG__", config)
return content
@app.get("/api/model-status")
async def model_status_endpoint():
"""Return the current model loading status."""
return get_model_status()
@app.get("/images/{filename}")
async def serve_image(filename: str):
"""Serve a generated plot image by filename."""
path = _served_files.get(f"img:{filename}")
if path and os.path.exists(path):
return FileResponse(path, media_type="image/png")
return HTMLResponse("Not found", status_code=404)
@app.get("/download/{filename}")
async def serve_download(filename: str):
"""Serve a project ZIP download by filename."""
path = _served_files.get(f"dl:{filename}")
if path and os.path.exists(path):
return FileResponse(path, filename=filename, media_type="application/octet-stream")
return HTMLResponse("Not found", status_code=404)
@app.get("/uploaded-images/{image_id}")
async def serve_uploaded_image(image_id: str):
"""Serve an uploaded image by its ID."""
path = _uploaded_images.get(image_id)
if path and os.path.exists(path):
return FileResponse(path, media_type="image/png")
return HTMLResponse("Not found", status_code=404)
# ─── Gradio API Endpoints ──────────────────────────────────────────────
@app.api(name="switch_model", concurrency_limit=1)
def handle_switch_model(model_key: str) -> str:
"""Switch to a different model."""
result = switch_model(model_key)
yield json.dumps(result)
@app.api(name="upload_image", concurrency_limit=4)
def handle_upload_image(image_data: str) -> str:
"""Upload a base64-encoded image for VLM inference.
Returns an image ID that can be referenced in chat.
"""
try:
if not image_data:
yield json.dumps({"success": False, "message": "No image data provided"})
return
# Handle data URI format: data:image/png;base64,...
if image_data.startswith("data:"):
# Extract the base64 part
parts = image_data.split(",", 1)
if len(parts) == 2:
image_data = parts[1]
# Decode base64
image_bytes = base64.b64decode(image_data)
# Save to temp file
img_dir = tempfile.mkdtemp(prefix="uploaded_img_")
image_id = f"img_{os.getpid()}_{int(os.urandom(4).hex(), 16)}"
img_path = os.path.join(img_dir, f"{image_id}.png")
Path(img_path).write_bytes(image_bytes)
# Register for serving
_uploaded_images[image_id] = img_path
# Create a URL for the image that the VLM can access
image_url = f"/uploaded-images/{image_id}"
# Also save as a file:// URL for local VLM access
file_url = f"file://{img_path}"
yield json.dumps({
"success": True,
"image_id": image_id,
"image_url": image_url,
"file_url": file_url,
"message": "Image uploaded successfully",
})
except Exception as exc:
logger.exception("Image upload failed")
yield json.dumps({
"success": False,
"message": f"Upload failed: {str(exc)}",
})
@app.api(name="web_search", concurrency_limit=4)
def handle_web_search(query: str) -> str:
"""Search the web using Google scraping. No API key needed."""
query = (query or "").strip()
if not query:
yield json.dumps({"success": False, "results": [], "message": "Empty search query"})
return
try:
results = web_search_google(query, num_results=8)
formatted = format_search_results(results)
yield json.dumps({
"success": True,
"results": results,
"formatted": formatted,
"message": f"Found {len(results)} results",
})
except Exception as exc:
logger.exception("Web search failed")
yield json.dumps({
"success": False,
"results": [],
"message": f"Search failed: {str(exc)}",
})
@app.api(name="chat", concurrency_limit=2)
def handle_chat(
prompt: str,
target_language: str,
target_framework: str,
history_json: str,
exec_context_json: str,
search_enabled: str = "false",
image_url: str = "",
) -> str:
"""Stream chat responses with code execution. Yields JSON strings."""
history = json.loads(history_json) if history_json else []
execution_context = json.loads(exec_context_json) if exec_context_json else {}
prompt = (prompt or "").strip()
if not prompt:
yield json.dumps({
"type": "error",
"status_text": "Enter a prompt to get started.",
"status_state": "info",
"history": history,
"execution": execution_context,
})
return
# Check model status
model_status = get_model_status()
if model_status["status"] == "loading":
yield json.dumps({
"type": "error",
"status_text": model_status["message"],
"status_state": "working",
"history": history,
"execution": execution_context,
})
return
if model_status["status"] != "ready":
yield json.dumps({
"type": "error",
"status_text": model_status["message"],
"status_state": "error",
"history": history,
"execution": execution_context,
})
return
# Add user message and placeholder assistant message
history = list(history) + [
{"role": "user", "content": prompt},
{"role": "assistant", "content": ""},
]
yield json.dumps({
"type": "status",
"status_text": "Thinking...",
"status_state": "working",
"history": history,
"execution": execution_context,
})
# Web search if enabled
search_context = ""
if search_enabled.lower() == "true":
yield json.dumps({
"type": "status",
"status_text": "Searching the web...",
"status_state": "working",
"history": history,
"execution": execution_context,
})
search_results = web_search_google(prompt, num_results=6)
if search_results:
search_context = format_search_results(search_results)
yield json.dumps({
"type": "search_results",
"status_text": f"Found {len(search_results)} results, generating code...",
"status_state": "working",
"history": history,
"execution": execution_context,
"search_results": search_results,
})
# Build messages for model
model_history = list(history[:-1])
model_history[-1] = {
"role": "user",
"content": targeted_prompt(
prompt, target_language, target_framework, execution_context, search_context
),
}
messages = chat_history_to_messages(model_history)
# Determine image URL for VLM
vlm_image_url = image_url.strip() if image_url else None
final_response = ""
for partial in call_model(messages, image_url=vlm_image_url):
final_response = partial
# Strip thinking blocks so chat only shows clean output
clean_partial = strip_thinking_blocks(partial)
history[-1]["content"] = clean_partial
yield json.dumps({
"type": "streaming",
"status_text": "Generating...",
"status_state": "working",
"history": history,
"execution": execution_context,
})
if not final_response:
history[-1]["content"] = "The model did not return a response."
yield json.dumps({
"type": "error",
"status_text": "No model response.",
"status_state": "error",
"history": history,
"execution": execution_context,
})
return
# Extract code from response (use cleaned version)
clean_response = strip_thinking_blocks(final_response)
code, fence_lang = extract_code(clean_response)
target = normalize_language(target_language, fence_lang)
# Also try multi-file extraction
multi_files = extract_multi_file(clean_response)
if not code and not multi_files:
yield json.dumps({
"type": "complete",
"status_text": "Answered without running code.",
"status_state": "info",
"history": history,
"execution": execution_context,
})
return
yield json.dumps({
"type": "status",
"status_text": "Running...",
"status_state": "working",
"history": history,
"execution": execution_context,
})
# Execute code
stdout, stderr, image_path, status_text, status_state = "", "", None, "Preview ready", "success"
is_gradio = False
gradio_url = None
if target == "python" and code:
if is_gradio_code(code) or target_framework == "Gradio":
is_gradio = True
gradio_result = run_gradio_app(code)
if gradio_result["success"]:
gradio_url = gradio_result["url"]
status_text = f"Gradio app running at {gradio_url}"
status_state = "success"
stderr = f"Gradio app launched successfully at {gradio_url}"
else:
status_text = "Gradio launch failed"
status_state = "error"
stderr = gradio_result.get("stderr", gradio_result.get("message", "Launch failed"))
else:
result = run_python(code)
if result.timed_out:
stdout, stderr, image_path = result.stdout, result.stderr, result.image_path
status_text = f"Timed out after {PY_TIMEOUT_S}s"
status_state = "error"
elif result.returncode:
stdout, stderr, image_path = result.stdout, result.stderr, result.image_path
status_text = "Finished with errors"
status_state = "error"
else:
stdout, stderr, image_path = result.stdout, result.stderr, result.image_path
status_text = "Ran successfully"
status_state = "success"
# Register image for serving
image_url_out = None
if image_path:
filename = os.path.basename(image_path)
_served_files[f"img:{filename}"] = image_path
image_url_out = f"/images/{filename}"
# Register code for download
download_url = None
project_files = dict(multi_files) if multi_files else {}
# Rename main.py β†’ app.py for Python/Gradio projects (HF Spaces expects app.py)
if project_files and "main.py" in project_files and "app.py" not in project_files:
if target == "python" or is_gradio:
project_files["app.py"] = project_files.pop("main.py")
# If project_files is empty but we have single code, add it
if not project_files and code:
if target == "python":
fname = "app.py" if (is_gradio or is_gradio_code(code)) else "main.py"
elif target in {"web", "html", "javascript"}:
fname = "index.html"
else:
fname = f"main.{fence_lang or 'txt'}"
project_files = {fname: code}
if project_files:
project_name = "generated-project"
zip_path = create_project_zip(project_files, project_name)
zip_filename = f"{project_name}.zip"
_served_files[f"dl:{zip_filename}"] = zip_path
download_url = f"/download/{zip_filename}"
elif code:
ext = "py" if target == "python" else "html"
dl_filename = f"generated.{ext}"
dl_dir = tempfile.mkdtemp(prefix="fullstack_dl_")
dl_path = os.path.join(dl_dir, dl_filename)
Path(dl_path).write_text(code, encoding="utf-8")
_served_files[f"dl:{dl_filename}"] = dl_path
download_url = f"/download/{dl_filename}"
# Determine if this is web previewable
is_web = target in {"web", "javascript", "typescript", "html"} or (fence_lang or "") in {"html", "web"}
web_code = code if is_web else None
execution_context = {
"code": code,
"target": target,
"fence_lang": fence_lang or target,
"stdout": stdout,
"stderr": stderr,
"image_url": image_url_out,
"image_path": image_path,
"status": status_text,
"language": fence_lang or target,
"suggested_tab": "preview" if (image_path or is_web or is_gradio) else "console",
"download_url": download_url,
"project_files": project_files,
"is_web": is_web,
"web_code": web_code,
"is_gradio": is_gradio,
"gradio_url": gradio_url,
}
yield json.dumps({
"type": "complete",
"status_text": status_text,
"status_state": status_state,
"history": history,
"execution": execution_context,
})
@app.api(name="hf_auth", concurrency_limit=4)
def handle_hf_auth(
oauth_token: str = "",
token: Optional[gr.OAuthToken] = None,
) -> str:
"""Get HuggingFace OAuth profile and list of organizations.
Reads the OAuth token from (in priority order):
1. The `oauth_token` parameter (explicit pass-in from the client).
2. The injected `token: gr.OAuthToken` β€” Gradio auto-injects this
from the user's session IF they have completed the
"Sign in with HuggingFace" OAuth flow.
3. The `HF_TOKEN` env var (for local dev / when running outside a Space).
Returns the user's profile, organizations, and the access token so the
frontend can use it for `push_hf` calls (deploying to user's HF account).
"""
try:
from huggingface_hub import whoami
resolved = oauth_token.strip() if oauth_token else ""
# ── Fall back to Gradio-injected OAuthToken (from session) ───────
if not resolved and token is not None:
try:
resolved = (getattr(token, "token", "") or "").strip()
except Exception:
pass
# ── Last-resort fallback: HF_TOKEN env var (local dev) ──────────
if not resolved:
resolved = (os.getenv("HF_TOKEN") or "").strip()
if not resolved:
yield json.dumps({
"authenticated": False,
"username": "",
"name": "",
"picture": "",
"organizations": [],
"message": "Not signed in. Click Sign In to authenticate with HuggingFace.",
})
return
# Get user info using the OAuth token
user_info = whoami(token=resolved)
username = user_info.get("name", "")
fullname = user_info.get("fullname", username)
# Get avatar
avatar_url = ""
avatar_info = user_info.get("avatarUrl", "")
if avatar_info:
avatar_url = avatar_info
# Get organizations
orgs = []
for org in user_info.get("orgs", []):
orgs.append({
"name": org.get("name", ""),
"avatar": org.get("avatarUrl", ""),
})
# Also check orgRoles for role info
org_roles = user_info.get("orgRoles", [])
for role_info in org_roles:
org_name = role_info.get("org", "")
role = role_info.get("role", "member")
# Add role info to existing org if found
for org in orgs:
if org["name"] == org_name:
org["role"] = role
break
yield json.dumps({
"authenticated": True,
"username": username,
"name": fullname,
"picture": avatar_url,
"organizations": orgs,
"token": resolved,
"message": f"Signed in as {username}",
})
except Exception as exc:
logger.exception("HF auth check failed")
yield json.dumps({
"authenticated": False,
"username": "",
"name": "",
"picture": "",
"organizations": [],
"message": f"Auth check failed: {str(exc)}",
})
@app.api(name="push_hf", concurrency_limit=1)
def handle_push_hf(
exec_context_json: str,
repo_name: str,
hf_token: str,
space_sdk: str = "auto",
is_space: str = "true",
) -> str:
"""Push generated project to HuggingFace Hub."""
try:
execution_context = json.loads(exec_context_json) if exec_context_json else {}
project_files = dict(execution_context.get("project_files", {}) or {})
code = execution_context.get("code", "")
# If project_files is empty but we have code, build files from code
if not project_files and code:
lang = execution_context.get("language", "python")
is_gradio = execution_context.get("is_gradio", False)
# Map language to entry file β€” JS/TS single-files get wrapped for Docker
if lang in ("javascript", "js", "typescript", "ts"):
# For single-file JS/TS code that is HTML (vanilla), keep as index.html
if "<!doctype" in code.lower() or "<html" in code.lower():
filename = "index.html"
else:
filename = "index.js"
elif lang in ("html", "web"):
filename = "index.html"
else:
ext_map = {
"python": "app.py", "py": "app.py",
}
filename = ext_map.get(lang, "app.py")
project_files = {filename: code}
# Auto-detect SDK for Gradio apps
if is_gradio or is_gradio_code(code):
space_sdk = "gradio"
# If still no files, try extracting from the raw response
if not project_files and code:
project_files = extract_multi_file(code)
if not project_files:
yield json.dumps({
"success": False,
"message": "No code to push. Generate some code first.",
"url": "",
})
return
# "auto" SDK means let push_to_huggingface decide
if space_sdk == "auto":
space_sdk = "static" # push_to_huggingface will auto-detect from files
project_name = repo_name.split("/")[-1] if "/" in repo_name else repo_name
result = push_to_huggingface(
files=project_files,
project_name=project_name,
repo_name=repo_name,
hf_token=hf_token,
space_sdk=space_sdk,
is_space=is_space.lower() == "true",
)
yield json.dumps(result)
except Exception as exc:
logger.exception("Push to HuggingFace failed")
yield json.dumps({
"success": False,
"message": f"Push failed: {str(exc)}",
"url": "",
})
def get_app() -> Server:
"""Return the configured Gradio Server app instance."""
return app
# ─── Agent / Skills / Commands / Hooks / Workspace Endpoints ──────────
@app.api(name="agent_run", concurrency_limit=2)
def handle_agent_run(
prompt: str,
target_language: str = "",
target_framework: str = "",
history_json: str = "[]",
skills_json: str = "[]",
search_enabled: str = "false",
image_url: str = "",
agent_name: str = "",
) -> str:
"""Run the Claude Code-style agent loop with tools.
Yields JSON events: status, tool_call, tool_result, streaming, complete, error.
`agent_name` (optional) overrides the session-active agent for this run.
The `/agent use`, `/agent reset`, and `/agent delete` slash commands are
intercepted here and dispatched to the agents module before the model runs.
"""
from code.agent import run_agent
history = json.loads(history_json) if history_json else []
skills = json.loads(skills_json) if skills_json else []
prompt = (prompt or "").strip()
if not prompt:
yield json.dumps({
"type": "error",
"message": "Empty prompt",
})
return
# ── Intercept /agent use|reset|delete (session-state mutations) ────
# These need to happen server-side BEFORE the model runs so the very
# next prompt reflects the change.
stripped = prompt.lstrip()
if stripped.startswith("/agent ") or stripped == "/agent":
from code.agents import (
set_active_agent,
delete_agent as _delete_agent,
list_agents as _list_agents,
get_active_agent,
)
parts = stripped.split(None, 2) # ["/agent", <sub>, <rest>]
sub = parts[1] if len(parts) > 1 else ""
arg = parts[2].strip() if len(parts) > 2 else ""
if sub == "use" and arg:
result = set_active_agent(arg)
yield json.dumps({
"type": "complete",
"content": (
f"**Agent activated: `{result.get('active_agent')}`**\n\n"
+ (result.get("message", "") if not result.get("success") else "Subsequent prompts will use this agent's persona and tool whitelist.")
),
"agent": result.get("active_agent"),
"agent_op": "use",
})
return
if sub == "reset" or (sub == "" and arg == ""):
result = set_active_agent(None)
yield json.dumps({
"type": "complete",
"content": "**Active agent reset.** Subsequent prompts will use the default SoniCoder persona.",
"agent": None,
"agent_op": "reset",
})
return
if sub == "delete" and arg:
result = _delete_agent(arg)
if result.get("success"):
yield json.dumps({
"type": "complete",
"content": f"**Agent `{arg}` deleted.**",
"agent": None,
"agent_op": "delete",
})
else:
yield json.dumps({
"type": "error",
"message": result.get("error", f"Failed to delete agent '{arg}'"),
"agent_op": "delete",
})
return
if sub == "list":
agents_list = _list_agents()
if not agents_list:
content = "_No agents available._ Create one with `/agent create <description>`."
else:
lines = ["| Name | Description | Author | Tools |", "|------|-------------|--------|-------|"]
for a in agents_list:
tools = ", ".join(a.get("tools", [])) or "(all)"
active_marker = " **(active)**" if a.get("active") else ""
lines.append(f"| `{a['name']}`{active_marker} | {a.get('description', '')[:80]} | {a.get('author', '')} | {tools} |")
content = "\n".join(lines)
yield json.dumps({
"type": "complete",
"content": content,
"agents": agents_list,
"agent_op": "list",
})
return
# /agent create|show β†’ fall through to the model (handled by slash command expansion)
# Optional web search
search_context = ""
if search_enabled.lower() == "true":
try:
search_results = web_search_google(prompt, num_results=6)
if search_results:
search_context = format_search_results(search_results)
yield json.dumps({
"type": "search_results",
"results": search_results,
"status_text": f"Found {len(search_results)} results, running agent...",
})
except Exception as exc:
logger.warning("Web search failed: %s", exc)
try:
for event in run_agent(
user_input=prompt,
history=history,
target_language=target_language,
target_framework=target_framework,
skills=skills,
search_context=search_context,
image_url=image_url.strip() or None,
agent_name=agent_name.strip() or None,
):
yield json.dumps(event, default=str)
except Exception as exc:
logger.exception("Agent run failed")
yield json.dumps({
"type": "error",
"message": str(exc),
})
@app.api(name="list_skills", concurrency_limit=4)
def handle_list_skills() -> str:
"""List all available skills."""
from code.skills import list_skills
skills = list_skills()
yield json.dumps({"success": True, "skills": skills})
@app.api(name="list_commands", concurrency_limit=4)
def handle_list_commands() -> str:
"""List all available slash commands."""
from code.commands import list_commands
commands = list_commands()
yield json.dumps({"success": True, "commands": commands})
@app.api(name="list_hooks", concurrency_limit=4)
def handle_list_hooks() -> str:
"""List all configured hooks."""
from code.hooks import list_hooks
hooks = list_hooks()
yield json.dumps({"success": True, "hooks": hooks})
@app.api(name="workspace_tree", concurrency_limit=4)
def handle_workspace_tree() -> str:
"""Return the workspace file tree."""
from code.tools.fs import list_workspace_tree
result = list_workspace_tree()
yield json.dumps(result, default=str)
@app.api(name="workspace_read", concurrency_limit=4)
def handle_workspace_read(path: str, offset: int = 0, limit: int = 0) -> str:
"""Read a file from the workspace."""
from code.tools.fs import read_file
args = {"path": path}
if offset:
args["offset"] = offset
if limit:
args["limit"] = limit
result = read_file(**args)
yield json.dumps(result, default=str)
@app.api(name="workspace_write", concurrency_limit=1)
def handle_workspace_write(path: str, content: str) -> str:
"""Write a file to the workspace."""
from code.tools.fs import write_file
result = write_file(path=path, content=content)
yield json.dumps(result, default=str)
@app.api(name="workspace_bash", concurrency_limit=1)
def handle_workspace_bash(command: str, timeout: int = 30) -> str:
"""Run a bash command in the workspace."""
from code.tools.bash import run_bash
result = run_bash(command=command, timeout=timeout)
yield json.dumps(result, default=str)
@app.api(name="workspace_edit", concurrency_limit=1)
def handle_workspace_edit(
path: str,
old_str: str,
new_str: str,
replace_all: str = "false",
) -> str:
"""Edit a file in the workspace."""
from code.tools.fs import edit_file
result = edit_file(
path=path,
old_str=old_str,
new_str=new_str,
replace_all=replace_all.lower() == "true",
)
yield json.dumps(result, default=str)
@app.api(name="workspace_glob", concurrency_limit=4)
def handle_workspace_glob(pattern: str, path: str = ".") -> str:
"""Glob files in the workspace."""
from code.tools.fs import glob_paths
result = glob_paths(pattern=pattern, path=path)
yield json.dumps(result, default=str)
@app.api(name="workspace_grep", concurrency_limit=4)
def handle_workspace_grep(
pattern: str,
path: str = ".",
include: str = "",
ignore_case: str = "false",
) -> str:
"""Grep file contents in the workspace."""
from code.tools.fs import grep_search
result = grep_search(
pattern=pattern,
path=path,
include=include or None,
ignore_case=ignore_case.lower() == "true",
)
yield json.dumps(result, default=str)
@app.api(name="todo_read", concurrency_limit=4)
def handle_todo_read(session_id: str = "default") -> str:
"""Read the current todo list."""
from code.tools.todos import todo_read
result = todo_read(session_id=session_id)
yield json.dumps(result, default=str)
@app.api(name="todo_write", concurrency_limit=1)
def handle_todo_write(todos_json: str, session_id: str = "default") -> str:
"""Replace the todo list."""
from code.tools.todos import todo_write
todos = json.loads(todos_json) if todos_json else []
result = todo_write(todos=todos, session_id=session_id)
yield json.dumps(result, default=str)
@app.api(name="workspace_snapshot", concurrency_limit=2)
def handle_workspace_snapshot() -> str:
"""Return all workspace files for ZIP/deploy."""
from code.tools.fs import snapshot_workspace
files = snapshot_workspace()
yield json.dumps({"success": True, "files": files, "count": len(files)})
@app.api(name="workspace_reset", concurrency_limit=1)
def handle_workspace_reset() -> str:
"""Clear the workspace."""
from code.tools.fs import reset_workspace
result = reset_workspace()
yield json.dumps(result, default=str)
@app.api(name="create_hook", concurrency_limit=1)
def handle_create_hook(
name: str,
event: str,
pattern: str,
action: str = "warn",
message: str = "",
enabled: str = "true",
) -> str:
"""Create a new user hook."""
from code.hooks import create_hook
result = create_hook(
name=name,
event=event,
pattern=pattern,
action=action,
message=message,
enabled=enabled.lower() == "true",
)
yield json.dumps(result, default=str)
@app.api(name="delete_hook", concurrency_limit=1)
def handle_delete_hook(name: str) -> str:
"""Delete a user hook by name."""
from code.hooks import delete_hook
result = delete_hook(name)
yield json.dumps(result, default=str)
# ─── Custom Agent Endpoints ────────────────────────────────────────────
@app.api(name="list_agents", concurrency_limit=4)
def handle_list_agents() -> str:
"""List all available custom agents (builtins + user)."""
from code.agents import list_agents, get_active_agent
agents = list_agents()
active = get_active_agent()
yield json.dumps({
"success": True,
"agents": agents,
"active_agent": active,
}, default=str)
@app.api(name="get_agent", concurrency_limit=4)
def handle_get_agent(name: str) -> str:
"""Get the full definition of a single agent."""
from code.agents import get_agent
agent = get_agent(name)
if not agent:
yield json.dumps({"success": False, "error": f"Agent not found: {name}"})
return
# Strip non-serializable path
agent_serializable = {k: v for k, v in agent.items() if k != "path"}
yield json.dumps({"success": True, "agent": agent_serializable}, default=str)
@app.api(name="save_agent", concurrency_limit=1)
def handle_save_agent(
name: str,
description: str,
body: str,
tools: str = "",
skills: str = "",
temperature: str = "",
max_iterations: str = "",
tags: str = "",
author: str = "user",
) -> str:
"""Create or overwrite a custom agent definition (manual save, no AI).
`tools`, `skills`, `tags` are comma-separated strings. `temperature` and
`max_iterations` are strings that will be parsed if non-empty.
"""
from code.agents import save_agent, ALL_TOOLS
def _split(s: str) -> list[str]:
return [x.strip() for x in (s or "").split(",") if x.strip()]
tools_list = _split(tools) or list(ALL_TOOLS)
skills_list = _split(skills)
tags_list = _split(tags)
temp_val = None
if temperature.strip():
try:
temp_val = float(temperature)
except ValueError:
yield json.dumps({"success": False, "error": f"Invalid temperature: {temperature}"})
return
iter_val = None
if max_iterations.strip():
try:
iter_val = int(max_iterations)
except ValueError:
yield json.dumps({"success": False, "error": f"Invalid max_iterations: {max_iterations}"})
return
result = save_agent(
name=name,
description=description,
body=body,
tools=tools_list,
skills=skills_list,
temperature=temp_val,
max_iterations=iter_val,
tags=tags_list,
author=author,
)
yield json.dumps(result, default=str)
@app.api(name="delete_agent", concurrency_limit=1)
def handle_delete_agent(name: str) -> str:
"""Delete a user-defined agent by name."""
from code.agents import delete_agent
result = delete_agent(name)
yield json.dumps(result, default=str)
@app.api(name="set_active_agent", concurrency_limit=1)
def handle_set_active_agent(name: str = "") -> str:
"""Set the active agent for subsequent prompts. Empty string resets."""
from code.agents import set_active_agent, list_agents, get_active_agent
result = set_active_agent(name.strip() or None)
if not result.get("success"):
yield json.dumps(result, default=str)
return
# Return fresh list + active agent so frontend can re-render
yield json.dumps({
**result,
"agents": list_agents(),
"active_agent": get_active_agent(),
}, default=str)
# ─── GitHub Import Endpoint ────────────────────────────────────────────
@app.api(name="import_github", concurrency_limit=1)
def handle_import_github(
url: str,
branch: str = "",
subdir: str = "",
target_subdir: str = "",
depth: str = "1",
timeout: str = "120",
) -> str:
"""Clone a GitHub repo into the sandboxed workspace.
Parameters
----------
url : str
GitHub URL. Accepts:
- https://github.com/<owner>/<repo>[.git]
- https://github.com/<owner>/<repo>/tree/<branch>[/<subdir>]
- git@github.com:<owner>/<repo>.git
branch : str
Optional branch/tag override. If empty, uses URL's branch or the
repo's default branch.
subdir : str
Optional sub-directory inside the repo to import.
target_subdir : str
Where inside the workspace to place the import. Empty = root.
depth : str
Git clone depth (default "1" for shallow clone).
timeout : str
Git clone timeout in seconds (default "120").
Yields
------
JSON dict with keys: success, message, url, owner, repo, branch,
subdir, files_imported, dirs_skipped, workspace_path, tree_preview.
"""
from code.tools.github import import_github_repo
try:
depth_int = int(depth) if str(depth).strip() else 1
depth_int = max(1, min(50, depth_int))
except (ValueError, TypeError):
depth_int = 1
try:
timeout_int = int(timeout) if str(timeout).strip() else 120
timeout_int = max(10, min(600, timeout_int))
except (ValueError, TypeError):
timeout_int = 120
result = import_github_repo(
url=url,
branch=branch,
subdir=subdir,
target_subdir=target_subdir,
depth=depth_int,
timeout=timeout_int,
)
yield json.dumps(result, default=str)
@app.api(name="github_url_examples", concurrency_limit=4)
def handle_github_url_examples() -> str:
"""Return example GitHub URL formats accepted by import_github."""
from code.tools.github import list_github_url_examples
result = list_github_url_examples()
yield json.dumps(result, default=str)
@app.api(name="push_github", concurrency_limit=1)
def handle_push_github(
repo_name: str,
github_token: str,
username: str,
branch: str = "main",
commit_message: str = "",
timeout: str = "120",
) -> str:
"""Push the current workspace to a GitHub repo.
Requires only 3 user inputs (repo_name, github_token, username) plus
optional branch / commit_message / timeout. The workspace is snapshotted
(via `snapshot_workspace`), written into a fresh git repo in a temp dir,
committed, and pushed to `https://github.com/<username>/<repo_name>.git`
using HTTPS basic auth with the token.
The push uses `--force-with-lease` so it replaces the remote tip with the
SoniCoder workspace contents. If the remote doesn't exist yet (no refs
to lease against), it retries with a plain push.
Yields
------
JSON dict with keys: success, message, repo_full_name, branch,
commit_sha, commit_url, repo_url, files_pushed, error (on failure).
"""
from code.tools.github import push_to_github
try:
timeout_int = int(timeout) if str(timeout).strip() else 120
timeout_int = max(10, min(600, timeout_int))
except (ValueError, TypeError):
timeout_int = 120
result = push_to_github(
repo_name=repo_name,
github_token=github_token,
username=username,
branch=branch or "main",
commit_message=commit_message or "",
timeout=timeout_int,
)
yield json.dumps(result, default=str)