| import os |
| import subprocess |
| import csv |
| import shutil |
| import threading |
| import logging |
| import signal |
| import sys |
| from pathlib import Path |
| from datetime import datetime |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
| |
| |
| OPENAI_API_KEY = "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" |
|
|
| |
| SOURCE_REPOS_DIR = Path("/home/weifengsun/tangou1/domain_code/src/workdir/repos_raw").resolve() |
|
|
| |
| BASE_OUTPUT_DIR = Path("~/chemrepo").expanduser().resolve() |
|
|
| |
| GLOBAL_ERROR_LOG = BASE_OUTPUT_DIR / "failures.log" |
|
|
| |
| CSV_FILE = BASE_OUTPUT_DIR / "run.csv" |
|
|
| |
| MAX_WORKERS = 256 |
| |
|
|
| |
| os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY |
|
|
| |
| BASE_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| |
| |
| error_log_lock = threading.Lock() |
| |
| active_projects = set() |
| active_projects_lock = threading.Lock() |
|
|
| def add_active_project(name): |
| with active_projects_lock: |
| active_projects.add(name) |
|
|
| def remove_active_project(name): |
| with active_projects_lock: |
| active_projects.discard(name) |
|
|
| def log_failure_globally(project_name, content, extra_info=""): |
| """将失败信息写入全局日志""" |
| with error_log_lock: |
| with open(GLOBAL_ERROR_LOG, "a", encoding="utf-8") as g_log: |
| g_log.write(f"\n{'='*40}\n") |
| g_log.write(f"PROJECT: {project_name}\n") |
| g_log.write(f"TIME: {datetime.now()}\n") |
| g_log.write(f"STATUS: Failed/Interrupted\n") |
| g_log.write(f"{'='*40}\n") |
| g_log.write(content) |
| if extra_info: |
| g_log.write(f"\n[Details]: {extra_info}\n") |
| g_log.write(f"\n{'='*40}\n") |
|
|
| def cleanup_project_folder(project_name): |
| """删除项目输出文件夹""" |
| project_out_dir = BASE_OUTPUT_DIR / project_name |
| if project_out_dir.exists(): |
| try: |
| shutil.rmtree(project_out_dir) |
| print(f"🗑️ Deleted failed/interrupted directory: {project_out_dir}") |
| except OSError as e: |
| print(f"⚠️ Failed to delete directory {project_out_dir}: {e}") |
|
|
| def process_single_project(project_path): |
| """ |
| 处理单个项目文件夹的任务函数 |
| """ |
| project_name = project_path.name |
| start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| |
| |
| project_out_dir = BASE_OUTPUT_DIR / project_name |
| hp_dir = project_out_dir / "hp" |
| mdp_dir = project_out_dir / "mdp" |
| local_log_file = project_out_dir / "process.log" |
|
|
| |
| |
| |
| if hp_dir.exists() and mdp_dir.exists(): |
| return { |
| "project": project_name, |
| "status": "Skipped", |
| "start_time": start_time, |
| "end_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| } |
| |
| |
| add_active_project(project_name) |
|
|
| |
| project_out_dir.mkdir(parents=True, exist_ok=True) |
|
|
| status = "Failed" |
| python_error = None |
|
|
| |
| try: |
| with open(local_log_file, "w", encoding="utf-8") as log_f: |
| try: |
| log_f.write(f"[{datetime.now()}] Processing project: {project_name}\n") |
| |
| |
| gitignore_path = project_path / ".gitignore" |
| if not gitignore_path.exists(): |
| gitignore_path.touch() |
| log_f.write(f"[{datetime.now()}] Created .gitignore file.\n") |
| |
| |
| cmd = [ |
| "repoagent", "run", |
| "-m", "gpt-5.1-2025-11-13", |
| "-r", "1", |
| "-tp", str(project_path.absolute()), |
| "--print-hierarchy", |
| "-hp", str(hp_dir), |
| "-mdp", str(mdp_dir) |
| ] |
|
|
| log_f.write(f"[{datetime.now()}] Command: {' '.join(cmd)}\n") |
| log_f.write(f"[{datetime.now()}] Starting RepoAgent...\n") |
| log_f.flush() |
|
|
| |
| subprocess.run(cmd, stdout=log_f, stderr=subprocess.STDOUT, check=True) |
| |
| |
| has_docs = False |
| if mdp_dir.exists(): |
| |
| if any(mdp_dir.iterdir()): |
| has_docs = True |
| |
| if has_docs: |
| status = "Success" |
| log_f.write(f"\n[{datetime.now()}] Completed successfully.\n") |
| else: |
| status = "EmptyProject" |
| log_f.write(f"\n[{datetime.now()}] Finished, but mdp folder is EMPTY. Marked as EmptyProject.\n") |
|
|
| except Exception as e: |
| status = "Failed" |
| python_error = str(e) |
| try: log_f.write(f"\n[{datetime.now()}] ERROR: {python_error}\n") |
| except: pass |
| print(f"❌ Error processing {project_name}: {python_error}") |
|
|
| |
| if status == "Failed": |
| |
| failed_log_content = "" |
| if local_log_file.exists(): |
| try: |
| with open(local_log_file, "r", encoding="utf-8", errors='ignore') as f: |
| failed_log_content = f.read() |
| except: failed_log_content = "Read Error" |
| |
| |
| log_failure_globally(project_name, failed_log_content, python_error) |
| |
| |
| cleanup_project_folder(project_name) |
|
|
| except Exception: |
| |
| pass |
| finally: |
| remove_active_project(project_name) |
|
|
| return { |
| "project": project_name, |
| "status": status, |
| "start_time": start_time, |
| "end_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| } |
|
|
| def main(): |
| if not SOURCE_REPOS_DIR.exists(): |
| print(f"Error: Source directory {SOURCE_REPOS_DIR} does not exist.") |
| return |
|
|
| |
| csv_headers = ["project", "status", "start_time", "end_time"] |
|
|
| |
| file_exists = CSV_FILE.exists() |
| with open(CSV_FILE, mode='a', newline='', encoding='utf-8') as f: |
| writer = csv.DictWriter(f, fieldnames=csv_headers) |
| if not file_exists: |
| writer.writeheader() |
|
|
| |
| projects = sorted([p for p in SOURCE_REPOS_DIR.iterdir() if p.is_dir()], key=lambda x: x.name) |
| |
| print(f"Found {len(projects)} projects (Sorted A-Z).\nOutput Dir: {BASE_OUTPUT_DIR}") |
| print(f"Failures Log: {GLOBAL_ERROR_LOG}") |
| print(f"Starting concurrent processing with {MAX_WORKERS} workers...\n") |
| print(f"💡 Press Ctrl+C to stop. Interrupted projects will be cleaned up automatically.\n") |
|
|
| executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) |
| |
| try: |
| future_to_project = {executor.submit(process_single_project, p): p for p in projects} |
|
|
| with open(CSV_FILE, mode='a', newline='', encoding='utf-8') as f: |
| writer = csv.DictWriter(f, fieldnames=csv_headers) |
| |
| for future in as_completed(future_to_project): |
| result = future.result() |
| writer.writerow(result) |
| f.flush() |
| |
| |
| if result["status"] == "Success": |
| print(f"✅ {result['project']} Finished.") |
| elif result["status"] == "EmptyProject": |
| print(f"⚠️ {result['project']} Finished (Empty - No Docs Generated).") |
| elif result["status"] == "Skipped": |
| print(f"⏭️ {result['project']} Skipped.") |
| else: |
| print(f"❌ {result['project']} Failed.") |
|
|
| except KeyboardInterrupt: |
| print("\n\n🛑 KeyboardInterrupt detected! Stopping workers...") |
| |
| executor.shutdown(wait=False, cancel_futures=True) |
| |
| print("🧹 Cleaning up active incomplete projects...") |
| with active_projects_lock: |
| projects_to_clean = list(active_projects) |
| |
| for proj_name in projects_to_clean: |
| log_failure_globally(proj_name, "Process terminated by User (KeyboardInterrupt).") |
| cleanup_project_folder(proj_name) |
| |
| print("Done. Exiting.") |
| sys.exit(0) |
|
|
| print(f"\nAll tasks completed. \nCSV: {CSV_FILE}") |
|
|
| if __name__ == "__main__": |
| main() |