Spaces:
Running
Running
fix: count annotator history from both HF snapshot + local file to survive restarts
bffee2e verified | """ | |
| MBench Annotation Space — 单视频标注 + MBench-V Pairwise + MBench-A Pairwise | |
| 功能: | |
| - Tab 1 (单视频标注): "该视频是否出现了记忆问题?" (MBench-V) | |
| - Tab 2 (MBench-V Pairwise): 同一 prompt 下两个 T2V 模型视频并排 (MBench-V) | |
| - Tab 3 (MBench-A Pairwise): 世界模型 401f 视频对比,4子集×多维度 (MBench-A) | |
| 技术栈: | |
| - Gradio 5.9.1 + FastAPI 视频代理 | |
| - HuggingFace CommitScheduler 自动推送标注结果 | |
| - 数据来源: studyOverflow/TempMemoryData | |
| 部署: | |
| 直接替换 HuggingFace Space 的 app.py 即可。 | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import random | |
| import threading | |
| import time | |
| import uuid | |
| from collections import defaultdict | |
| from pathlib import Path | |
| from typing import Any | |
| import gradio as gr | |
| from huggingface_hub import CommitScheduler, HfApi, hf_hub_download, hf_hub_url | |
| # --------------------------------------------------------------------------- | |
| # Config | |
| # --------------------------------------------------------------------------- | |
| DATASET_REPO = "studyOverflow/TempMemoryData" | |
| MERGED_JSON_PATH = "MBench-V/merged.json" | |
| MODELS: list[str] = [ | |
| "causal_forcing", | |
| "self_forcing", | |
| "cosmos", | |
| "helios", | |
| "longlive", | |
| "memflow", | |
| "longcat", | |
| "skyreels", | |
| ] | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| ANN_DIR = Path("annotations_local") | |
| ANN_DIR.mkdir(exist_ok=True) | |
| PROCESS_ID = uuid.uuid4().hex[:8] | |
| # Separate files for annotation types | |
| ANN_FILE_BINARY = ANN_DIR / f"ann_binary_{PROCESS_ID}.jsonl" | |
| ANN_FILE_PAIRWISE = ANN_DIR / f"ann_pairwise_{PROCESS_ID}.jsonl" | |
| ANN_FILE_MBENCH_A = ANN_DIR / f"ann_mbench_a_{PROCESS_ID}.jsonl" | |
| COMMIT_INTERVAL_MIN = 5 | |
| PENDING_TIMEOUT_SEC = 30 * 60 | |
| # MBench-V Pairwise config | |
| PAIRWISE_DIMENSIONS = [ | |
| ("entity", "实体一致性", "人物/物体离开画面再回来后,哪个视频中实体外观更一致?"), | |
| ("physical", "物理合理性", "哪个视频中的物理过程(水流/碰撞/变形等)更合理自然?"), | |
| ("prompt", "Prompt 忠实度", "哪个视频的内容更符合下方的文字描述?"), | |
| ] | |
| PAIRWISE_SAMPLES_PER_PAIR = 30 | |
| # --------------------------------------------------------------------------- | |
| # MBench-A Config | |
| # --------------------------------------------------------------------------- | |
| MBENCH_A_MODELS: list[str] = [ | |
| "hy_worldplay", | |
| "infinite_world", | |
| "lingbot_world", | |
| "matrix_game_2", | |
| "matrix_game_3", | |
| "yume", | |
| ] | |
| MBENCH_A_ANNOTATORS_PER_TASK = 3 | |
| MBENCH_A_CATEGORY_MAP = { | |
| "environment": "Spatial_401f", | |
| "object": "Spatial_401f", | |
| "human": "Human_401f", | |
| "causal": "Casual_401f", | |
| } | |
| MBENCH_A_GT_CATEGORY_MAP = { | |
| "environment": "Spatial", | |
| "object": "Spatial", | |
| "human": "Human", | |
| "causal": "Casual", | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Load MBench-V merged.json | |
| # --------------------------------------------------------------------------- | |
| def _load_merged() -> list[dict[str, Any]]: | |
| try: | |
| local = hf_hub_download( | |
| repo_id=DATASET_REPO, | |
| filename=MERGED_JSON_PATH, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| ) | |
| with open(local, encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception as e: | |
| print(f"[mbench-ann] WARNING: Failed to load MBench-V data: {e}") | |
| return [] | |
| TASKS: list[dict[str, Any]] = _load_merged() | |
| TASK_BY_ID: dict[str, dict[str, Any]] = {t["task_id"]: t for t in TASKS} | |
| # --------------------------------------------------------------------------- | |
| # Load MBench-A task pool | |
| # --------------------------------------------------------------------------- | |
| def _load_mbench_a_pool() -> dict[str, Any]: | |
| """Load MBench-A task pool from local file or HF.""" | |
| local_path = Path(__file__).parent / "sampling" / "task_pool.json" | |
| if local_path.exists(): | |
| with open(local_path, encoding="utf-8") as f: | |
| return json.load(f) | |
| # Fallback: try HF | |
| try: | |
| local = hf_hub_download( | |
| repo_id=DATASET_REPO, | |
| filename="MBench-A/task_pool.json", | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| ) | |
| with open(local, encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception as e: | |
| print(f"[mbench-ann] WARNING: Failed to load MBench-A task pool: {e}") | |
| return {"tasks": [], "quality_control_tasks": [], "metadata": {}} | |
| MBENCH_A_POOL = _load_mbench_a_pool() | |
| MBENCH_A_TASKS: list[dict] = MBENCH_A_POOL.get("tasks", []) + MBENCH_A_POOL.get("quality_control_tasks", []) | |
| MBENCH_A_TASK_BY_ID: dict[str, dict] = {t["task_id"]: t for t in MBENCH_A_TASKS} | |
| # --------------------------------------------------------------------------- | |
| # MBench-V Pool setup | |
| # --------------------------------------------------------------------------- | |
| BINARY_POOL: list[tuple[str, str]] = [(m, t["task_id"]) for m in MODELS for t in TASKS] | |
| BINARY_POOL_SET: set[tuple[str, str]] = set(BINARY_POOL) | |
| def _build_pairwise_pool() -> list[tuple[str, str, str, str]]: | |
| pool = [] | |
| task_ids = [t["task_id"] for t in TASKS[:PAIRWISE_SAMPLES_PER_PAIR]] | |
| for tid in task_ids: | |
| for i, m_a in enumerate(MODELS): | |
| for m_b in MODELS[i+1:]: | |
| for dim_key, _, _ in PAIRWISE_DIMENSIONS: | |
| pool.append((tid, m_a, m_b, dim_key)) | |
| return pool | |
| PAIRWISE_POOL: list[tuple[str, str, str, str]] = _build_pairwise_pool() | |
| PAIRWISE_POOL_SET: set[tuple[str, str, str, str]] = set(PAIRWISE_POOL) | |
| print(f"[mbench-ann] MBench-V: {len(TASKS)} tasks × {len(MODELS)} models") | |
| print(f"[mbench-ann] MBench-V binary pool: {len(BINARY_POOL)}, pairwise pool: {len(PAIRWISE_POOL)}") | |
| print(f"[mbench-ann] MBench-A: {len(MBENCH_A_TASKS)} tasks, {len(MBENCH_A_POOL.get('metadata', {}))} metadata") | |
| # --------------------------------------------------------------------------- | |
| # Video URL helpers | |
| # --------------------------------------------------------------------------- | |
| def _video_url(model: str, task_id: str) -> str: | |
| return f"/video/{model}/{task_id}.mp4" | |
| def _hf_video_url(model: str, task_id: str) -> str: | |
| return hf_hub_url( | |
| DATASET_REPO, | |
| filename=f"MBench-V/{model}/videos/{task_id}.mp4", | |
| repo_type="dataset", | |
| ) | |
| def _mbench_a_video_proxy_url(model: str, subset: str, sample_id: str) -> str: | |
| """Build local proxy URL for MBench-A video.""" | |
| category = MBENCH_A_CATEGORY_MAP[subset] | |
| return f"/video_a/{model}/{category}/{sample_id}/left_then_right.mp4" | |
| def _mbench_a_hf_video_url(model: str, category: str, sample_id: str) -> str: | |
| """Build HF upstream URL for MBench-A video.""" | |
| return hf_hub_url( | |
| DATASET_REPO, | |
| filename=f"MBench-A/{model}/{category}/{sample_id}/left_then_right.mp4", | |
| repo_type="dataset", | |
| ) | |
| def _mbench_a_asset_hf_url(path: str) -> str: | |
| """Build HF URL for MBench-A assets.""" | |
| return hf_hub_url( | |
| DATASET_REPO, | |
| filename=f"MBench-A/assets/{path}", | |
| repo_type="dataset", | |
| ) | |
| def _extract_prompt(task: dict[str, Any]) -> str: | |
| gp = task.get("generation_prompts") or {} | |
| prompts = gp.get("prompts") or {} | |
| for level in ("level_3", "level_4", "level_2", "level_1"): | |
| val = prompts.get(level) | |
| if isinstance(val, list) and val: | |
| n = len(val) | |
| return "\n\n".join(f"— 第 {i}/{n} 段 —\n{seg}" for i, seg in enumerate(val, 1)) | |
| if isinstance(val, str) and val: | |
| return val | |
| return "(no prompt found)" | |
| def _render_video_html(url: str) -> str: | |
| return ( | |
| f'<video controls autoplay muted loop playsinline width="100%" ' | |
| f'style="max-height:400px;object-fit:contain" src="{url}">' | |
| f'您的浏览器不支持 HTML5 视频。</video>' | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # MBench-A: Auxiliary info rendering | |
| # --------------------------------------------------------------------------- | |
| def _render_mbench_a_aux(task: dict) -> str: | |
| """Render auxiliary HTML info based on task subset.""" | |
| subset = task["subset"] | |
| # Use CSS class for guaranteed visibility (Gradio themes can override inline styles) | |
| box = 'class="aux-info-box"' | |
| # Camera motion info (shown for ALL subsets) | |
| motion = task.get("camera_motion", "left_then_right") | |
| motion_desc = task.get("camera_motion_description", motion) | |
| gif_url = _mbench_a_asset_hf_url(f"camera_diagrams/{motion}.gif") | |
| camera_html = ( | |
| f'<div style="flex:0 0 200px">' | |
| f'<p><b>🎬 预期相机运动</b></p>' | |
| f'<p style="margin:0 0 8px">{motion_desc}</p>' | |
| f'<img src="{gif_url}" style="width:180px">' | |
| f'</div>' | |
| ) | |
| # Caption (shown for ALL subsets now) | |
| caption = task.get("caption", "") | |
| caption_html = "" | |
| if caption: | |
| caption_html = ( | |
| f'<div style="flex:1;min-width:250px">' | |
| f'<p><b>📝 场景描述</b></p>' | |
| f'<p style="font-size:14px;line-height:1.5">{caption}</p>' | |
| f'</div>' | |
| ) | |
| if subset == "object": | |
| sample_id = task["sample_id"] | |
| mask_url = _mbench_a_asset_hf_url(f"mask_viz/{sample_id}.png") | |
| return ( | |
| f'<div {box}>' | |
| f'<p><b>🎯 请关注画面中被标注(高亮)的物体</b></p>' | |
| f'<div style="display:flex;gap:16px;flex-wrap:wrap;align-items:flex-start;margin-top:8px">' | |
| f'<div style="flex:1;min-width:300px">' | |
| f'<img src="{mask_url}" style="max-width:100%;max-height:280px">' | |
| f'</div>' | |
| f'{camera_html}' | |
| f'{caption_html}' | |
| f'</div></div>' | |
| ) | |
| elif subset == "causal": | |
| return ( | |
| f'<div {box}>' | |
| f'<div style="display:flex;gap:16px;flex-wrap:wrap;align-items:flex-start">' | |
| f'{camera_html}' | |
| f'{caption_html}' | |
| f'</div></div>' | |
| ) | |
| elif subset == "human": | |
| return ( | |
| f'<div {box}>' | |
| f'<p><b>👤 请关注视频中的人物</b>:观察人物离开画面再回来后,面部和外观是否保持一致。</p>' | |
| f'<div style="display:flex;gap:16px;flex-wrap:wrap;align-items:flex-start;margin-top:8px">' | |
| f'{camera_html}' | |
| f'{caption_html}' | |
| f'</div></div>' | |
| ) | |
| else: # environment | |
| return ( | |
| f'<div {box}>' | |
| f'<p><b>🏞️ 请关注整体场景</b>:观察相机转回来后,场景的布局、风格、光照是否保持一致。</p>' | |
| f'<div style="display:flex;gap:16px;flex-wrap:wrap;align-items:flex-start;margin-top:8px">' | |
| f'{camera_html}' | |
| f'{caption_html}' | |
| f'</div></div>' | |
| ) | |
| return ( | |
| f'<div {box}>' | |
| f'<div style="display:flex;gap:16px;flex-wrap:wrap;align-items:flex-start">' | |
| f'<div style="flex:1;min-width:250px">' | |
| f'<p><b>🏞️ 请关注整体场景</b>:观察相机转回来后,场景的布局、风格、光照是否保持一致。</p>' | |
| f'</div>' | |
| f'{camera_html}' | |
| f'</div></div>' | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # CommitScheduler | |
| # --------------------------------------------------------------------------- | |
| scheduler: CommitScheduler | None = None | |
| if HF_TOKEN: | |
| scheduler = CommitScheduler( | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset", | |
| folder_path=str(ANN_DIR), | |
| path_in_repo="annotations", | |
| every=COMMIT_INTERVAL_MIN, | |
| token=HF_TOKEN, | |
| private=False, | |
| squash_history=False, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Historical annotations | |
| # --------------------------------------------------------------------------- | |
| def _fetch_remote_annotations() -> list[dict[str, Any]]: | |
| records: list[dict[str, Any]] = [] | |
| try: | |
| api = HfApi(token=HF_TOKEN) | |
| files = api.list_repo_files(repo_id=DATASET_REPO, repo_type="dataset") | |
| except Exception: | |
| return records | |
| jsonls = [p for p in files if p.startswith("annotations/") and p.endswith(".jsonl")] | |
| for path in jsonls: | |
| try: | |
| local = hf_hub_download(repo_id=DATASET_REPO, filename=path, repo_type="dataset", token=HF_TOKEN) | |
| with open(local, encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if line: | |
| try: | |
| records.append(json.loads(line)) | |
| except Exception: | |
| pass | |
| except Exception: | |
| pass | |
| return records | |
| HISTORICAL = _fetch_remote_annotations() | |
| # --------------------------------------------------------------------------- | |
| # Shared state | |
| # --------------------------------------------------------------------------- | |
| STATE_LOCK = threading.Lock() | |
| # Binary state | |
| BINARY_SUBMITTED: set[tuple[str, str]] = { | |
| (r["model"], r["task_id"]) for r in HISTORICAL | |
| if r.get("type", "binary") == "binary" and "model" in r and "task_id" in r | |
| and (r["model"], r["task_id"]) in BINARY_POOL_SET | |
| } | |
| BINARY_PENDING: dict[tuple[str, str], tuple[str, float]] = {} | |
| # MBench-V Pairwise state | |
| PAIRWISE_SUBMITTED: set[tuple[str, str, str, str]] = { | |
| (r["task_id"], r["model_a"], r["model_b"], r["dimension"]) | |
| for r in HISTORICAL | |
| if r.get("type") == "pairwise" | |
| and all(k in r for k in ("task_id", "model_a", "model_b", "dimension")) | |
| } | |
| PAIRWISE_PENDING: dict[tuple[str, str, str, str], tuple[str, float]] = {} | |
| # MBench-A state: task_id -> list of annotators who completed it | |
| MBENCH_A_COMPLETED: dict[str, list[str]] = defaultdict(list) | |
| for r in HISTORICAL: | |
| if r.get("type") == "pairwise_mbench_a" and "task_id" in r and "annotator" in r: | |
| tid = r["task_id"] | |
| # Handle old format where task_id might be stored differently | |
| if tid in MBENCH_A_TASK_BY_ID: | |
| MBENCH_A_COMPLETED[tid].append(r["annotator"]) | |
| MBENCH_A_PENDING: dict[str, tuple[str, float]] = {} | |
| print(f"[mbench-ann] binary submitted: {len(BINARY_SUBMITTED)}") | |
| print(f"[mbench-ann] pairwise submitted: {len(PAIRWISE_SUBMITTED)}") | |
| print(f"[mbench-ann] MBench-A completed: {sum(len(v) for v in MBENCH_A_COMPLETED.values())} annotations across {len(MBENCH_A_COMPLETED)} tasks") | |
| # --------------------------------------------------------------------------- | |
| # Queue helpers | |
| # --------------------------------------------------------------------------- | |
| def _reap_expired(pending_dict): | |
| now = time.time() | |
| expired = [k for k, (_, ts) in pending_dict.items() if now - ts > PENDING_TIMEOUT_SEC] | |
| for k in expired: | |
| pending_dict.pop(k, None) | |
| def _append_annotation(record: dict[str, Any], ann_file: Path) -> None: | |
| line = json.dumps(record, ensure_ascii=False) | |
| if scheduler is not None: | |
| with scheduler.lock: | |
| with ann_file.open("a", encoding="utf-8") as f: | |
| f.write(line + "\n") | |
| else: | |
| with ann_file.open("a", encoding="utf-8") as f: | |
| f.write(line + "\n") | |
| # --------------------------------------------------------------------------- | |
| # Binary annotation callbacks (MBench-V) | |
| # --------------------------------------------------------------------------- | |
| def binary_start(annotator: str, state: dict): | |
| annotator = (annotator or "").strip() | |
| if not annotator: | |
| return state, "<p>请先输入名字。</p>", "", "", "⚠️ 请输入名字", "" | |
| order = list(range(len(BINARY_POOL))) | |
| random.shuffle(order) | |
| state = {"annotator": annotator, "order": order, "idx": 0, "current": None, "count": 0} | |
| return _binary_next(state) | |
| def _binary_next(state): | |
| annotator = state["annotator"] | |
| order = state["order"] | |
| idx = state.get("idx", 0) | |
| with STATE_LOCK: | |
| _reap_expired(BINARY_PENDING) | |
| while idx < len(order): | |
| mt = BINARY_POOL[order[idx]] | |
| if mt in BINARY_SUBMITTED or mt in BINARY_PENDING: | |
| idx += 1 | |
| continue | |
| BINARY_PENDING[mt] = (annotator, time.time()) | |
| state["idx"] = idx | |
| state["current"] = mt | |
| model, task_id = mt | |
| task = TASK_BY_ID[task_id] | |
| video_html = _render_video_html(_video_url(model, task_id)) | |
| meta = f"**模型**: `{model}` | **task_id**: `{task_id}` | **已提交**: {state['count']}" | |
| prompt = _extract_prompt(task) | |
| n_sub = len(BINARY_SUBMITTED) | |
| stats = f"全局进度: {n_sub}/{len(BINARY_POOL)} ({100*n_sub/len(BINARY_POOL):.1f}%)" | |
| return state, video_html, meta, prompt, f"✅ 已加载", stats | |
| state["current"] = None | |
| return state, "<p>🎉 全部完成!</p>", "全部标注完成", "", "完成", f"已完成 {len(BINARY_SUBMITTED)}/{len(BINARY_POOL)}" | |
| def binary_submit(state, verdict, note): | |
| if not state or not state.get("current"): | |
| return state, "<p>请先登录</p>", "", "", "否", "", "⚠️", "" | |
| mt = state["current"] | |
| model, task_id = mt | |
| record = { | |
| "type": "binary", | |
| "timestamp": time.time(), | |
| "annotator": state["annotator"], | |
| "model": model, | |
| "task_id": task_id, | |
| "memory_issue": verdict == "是", | |
| "verdict": verdict, | |
| "note": (note or "").strip(), | |
| } | |
| _append_annotation(record, ANN_FILE_BINARY) | |
| with STATE_LOCK: | |
| BINARY_PENDING.pop(mt, None) | |
| BINARY_SUBMITTED.add(mt) | |
| state["count"] = state.get("count", 0) + 1 | |
| state["idx"] = state["idx"] + 1 | |
| state["current"] = None | |
| result = _binary_next(state) | |
| return result[0], result[1], result[2], result[3], "否", "", f"✅ 已提交第 {state['count']} 条", result[5] | |
| def binary_skip(state): | |
| if not state or not state.get("current"): | |
| return state, "<p>请先登录</p>", "", "", "否", "", "⚠️", "" | |
| mt = state["current"] | |
| with STATE_LOCK: | |
| BINARY_PENDING.pop(mt, None) | |
| state["idx"] = state["idx"] + 1 | |
| state["current"] = None | |
| result = _binary_next(state) | |
| return result[0], result[1], result[2], result[3], "否", "", "⏭️ 已跳过", result[5] | |
| # --------------------------------------------------------------------------- | |
| # MBench-V Pairwise annotation callbacks | |
| # --------------------------------------------------------------------------- | |
| def pairwise_start(annotator: str, dimension: str, state: dict): | |
| annotator = (annotator or "").strip() | |
| if not annotator: | |
| return state, "<p>请先输入名字。</p>", "<p></p>", "", "", "⚠️ 请输入名字", "" | |
| dim_pool = [(i, item) for i, item in enumerate(PAIRWISE_POOL) if item[3] == dimension] | |
| order = list(range(len(dim_pool))) | |
| random.shuffle(order) | |
| state = { | |
| "annotator": annotator, "dimension": dimension, "dim_pool": dim_pool, | |
| "order": order, "idx": 0, "current": None, "count": 0, | |
| } | |
| return _pairwise_next(state) | |
| def _pairwise_next(state): | |
| annotator = state["annotator"] | |
| dim_pool = state["dim_pool"] | |
| order = state["order"] | |
| idx = state.get("idx", 0) | |
| dimension = state["dimension"] | |
| dim_label = dimension | |
| dim_question = "" | |
| for dk, dl, dq in PAIRWISE_DIMENSIONS: | |
| if dk == dimension: | |
| dim_label = dl | |
| dim_question = dq | |
| break | |
| with STATE_LOCK: | |
| _reap_expired(PAIRWISE_PENDING) | |
| while idx < len(order): | |
| pool_idx, item = dim_pool[order[idx]] | |
| tid, m_a, m_b = item[0], item[1], item[2] | |
| if item in PAIRWISE_SUBMITTED or item in PAIRWISE_PENDING: | |
| idx += 1 | |
| continue | |
| PAIRWISE_PENDING[item] = (annotator, time.time()) | |
| state["idx"] = idx | |
| state["current"] = item | |
| if random.random() < 0.5: | |
| left_model, right_model = m_a, m_b | |
| state["swapped"] = False | |
| else: | |
| left_model, right_model = m_b, m_a | |
| state["swapped"] = True | |
| task = TASK_BY_ID[tid] | |
| video_a_html = _render_video_html(_video_url(left_model, tid)) | |
| video_b_html = _render_video_html(_video_url(right_model, tid)) | |
| prompt = _extract_prompt(task) | |
| meta = f"**维度**: {dim_label} | **问题**: {dim_question}\n\n**已提交**: {state['count']}" | |
| n_sub = sum(1 for x in PAIRWISE_SUBMITTED if x[3] == dimension) | |
| n_total = len(dim_pool) | |
| stats = f"维度「{dim_label}」进度: {n_sub}/{n_total} ({100*n_sub/n_total:.1f}%)" | |
| return state, video_a_html, video_b_html, meta, prompt, "✅ 已加载", stats | |
| state["current"] = None | |
| return state, "<p>🎉 该维度全部完成!</p>", "", "全部完成", "", "完成", "" | |
| def pairwise_submit(state, verdict, note): | |
| if not state or not state.get("current"): | |
| return state, "", "", "", "", "⚠️ 请先登录", "" | |
| item = state["current"] | |
| tid, m_a, m_b, dimension = item | |
| swapped = state.get("swapped", False) | |
| if verdict == "左边更好": | |
| winner = m_b if swapped else m_a | |
| elif verdict == "右边更好": | |
| winner = m_a if swapped else m_b | |
| else: | |
| winner = "tie" | |
| record = { | |
| "type": "pairwise", | |
| "timestamp": time.time(), | |
| "annotator": state["annotator"], | |
| "task_id": tid, | |
| "model_a": m_a, | |
| "model_b": m_b, | |
| "dimension": dimension, | |
| "winner": winner, | |
| "verdict_raw": verdict, | |
| "swapped": swapped, | |
| "note": (note or "").strip(), | |
| } | |
| _append_annotation(record, ANN_FILE_PAIRWISE) | |
| with STATE_LOCK: | |
| PAIRWISE_PENDING.pop(item, None) | |
| PAIRWISE_SUBMITTED.add(item) | |
| state["count"] = state.get("count", 0) + 1 | |
| state["idx"] = state["idx"] + 1 | |
| state["current"] = None | |
| result = _pairwise_next(state) | |
| return result[0], result[1], result[2], result[3], result[4], f"✅ 已提交第 {state['count']} 条", result[6] | |
| def pairwise_skip(state): | |
| if not state or not state.get("current"): | |
| return state, "", "", "", "", "⚠️ 请先登录", "" | |
| item = state["current"] | |
| with STATE_LOCK: | |
| PAIRWISE_PENDING.pop(item, None) | |
| state["idx"] = state["idx"] + 1 | |
| state["current"] = None | |
| result = _pairwise_next(state) | |
| return result[0], result[1], result[2], result[3], result[4], "⏭️ 已跳过", result[6] | |
| # --------------------------------------------------------------------------- | |
| # MBench-A Pairwise annotation callbacks | |
| # --------------------------------------------------------------------------- | |
| def mbench_a_start(annotator: str, state: dict): | |
| """Login for MBench-A annotation.""" | |
| annotator = (annotator or "").strip() | |
| if not annotator: | |
| return (state, "⚠️ 请输入名字", "", "", "", "", | |
| gr.update(visible=False), gr.update(visible=False), | |
| gr.update(visible=False), gr.update(visible=False), | |
| gr.update(visible=False), | |
| "", "") | |
| # Count how many tasks this annotator has already completed. | |
| # Check both: | |
| # 1. MBENCH_A_COMPLETED (loaded from HF at startup + updated in-memory during this session) | |
| # 2. The local annotation file (captures annotations made this session before any push) | |
| historical_count = sum( | |
| 1 for anns in MBENCH_A_COMPLETED.values() | |
| if annotator in anns | |
| ) | |
| # Also scan the local file in case this session's annotations haven't been pushed yet | |
| if ANN_FILE_MBENCH_A.exists(): | |
| with ANN_FILE_MBENCH_A.open() as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| r = json.loads(line) | |
| if r.get("annotator") == annotator and r.get("type") == "pairwise_mbench_a": | |
| tid = r.get("task_id", "") | |
| # Only count if not already counted in MBENCH_A_COMPLETED | |
| if tid in MBENCH_A_TASK_BY_ID and annotator not in MBENCH_A_COMPLETED.get(tid, []): | |
| historical_count += 1 | |
| except Exception: | |
| pass | |
| # Shuffle task order for this annotator | |
| order = list(range(len(MBENCH_A_TASKS))) | |
| random.shuffle(order) | |
| state = { | |
| "annotator": annotator, | |
| "order": order, | |
| "idx": 0, | |
| "current_task_id": None, | |
| "swapped": False, | |
| "left_model": None, | |
| "right_model": None, | |
| "count": historical_count, | |
| } | |
| return _mbench_a_next(state) | |
| def _mbench_a_next(state: dict): | |
| """Find and load the next available MBench-A task.""" | |
| annotator = state["annotator"] | |
| order = state["order"] | |
| idx = state.get("idx", 0) | |
| with STATE_LOCK: | |
| _reap_expired(MBENCH_A_PENDING) | |
| while idx < len(order): | |
| task = MBENCH_A_TASKS[order[idx]] | |
| tid = task["task_id"] | |
| # Skip if already fully annotated | |
| if len(MBENCH_A_COMPLETED.get(tid, [])) >= MBENCH_A_ANNOTATORS_PER_TASK: | |
| idx += 1 | |
| continue | |
| # Skip if this annotator already did it | |
| if annotator in MBENCH_A_COMPLETED.get(tid, []): | |
| idx += 1 | |
| continue | |
| # Skip if currently pending by someone else | |
| if tid in MBENCH_A_PENDING and MBENCH_A_PENDING[tid][0] != annotator: | |
| idx += 1 | |
| continue | |
| # Assign this task | |
| MBENCH_A_PENDING[tid] = (annotator, time.time()) | |
| state["idx"] = idx | |
| state["current_task_id"] = tid | |
| # Randomly swap A/B | |
| m_a, m_b = task["model_a"], task["model_b"] | |
| if random.random() < 0.5: | |
| state["left_model"], state["right_model"] = m_a, m_b | |
| state["swapped"] = False | |
| else: | |
| state["left_model"], state["right_model"] = m_b, m_a | |
| state["swapped"] = True | |
| # Build UI outputs | |
| subset = task["subset"] | |
| video_left = _render_video_html( | |
| _mbench_a_video_proxy_url(state["left_model"], subset, task["sample_id"])) | |
| video_right = _render_video_html( | |
| _mbench_a_video_proxy_url(state["right_model"], subset, task["sample_id"])) | |
| aux_html = _render_mbench_a_aux(task) | |
| # Dimension questions | |
| dimensions = task["dimensions"] | |
| dim_questions = task.get("dimension_questions", {}) | |
| # Build question radio updates (max 5) | |
| q_updates = [] | |
| for i in range(6): | |
| if i < len(dimensions): | |
| dim_key = dimensions[i] | |
| question_text = dim_questions.get(dim_key, dim_key) | |
| q_updates.append(gr.update( | |
| visible=True, | |
| label=question_text, | |
| value="差不多", | |
| )) | |
| else: | |
| q_updates.append(gr.update(visible=False, value="差不多")) | |
| # Meta info | |
| subset_names = {"environment": "🏞️ Environment", "object": "🎯 Object", | |
| "human": "👤 Human", "causal": "⚡ Causal"} | |
| n_done = sum(1 for t in MBENCH_A_TASKS | |
| if len(MBENCH_A_COMPLETED.get(t["task_id"], [])) >= MBENCH_A_ANNOTATORS_PER_TASK) | |
| meta = (f"**子集**: {subset_names.get(subset, subset)} | " | |
| f"**已提交**: {state['count']}") | |
| stats = (f"全局进度: {n_done}/{len(MBENCH_A_TASKS)} tasks 完成 | " | |
| f"你已标注: {state['count']}") | |
| return (state, "✅ 已加载", aux_html, video_left, video_right, meta, | |
| *q_updates, "", stats) | |
| # All done | |
| state["current_task_id"] = None | |
| empty_q = gr.update(visible=False, value="差不多") | |
| return (state, "🎉 全部完成!", "", "<p>所有任务已完成</p>", "", "全部完成", | |
| empty_q, empty_q, empty_q, empty_q, empty_q, empty_q, "", "") | |
| def mbench_a_submit(state, q1_val, q2_val, q3_val, q4_val, q5_val, q6_val, note): | |
| """Submit MBench-A multi-dimension annotation.""" | |
| if not state or not state.get("current_task_id"): | |
| empty_q = gr.update(visible=False, value="差不多") | |
| return (state, "⚠️ 请先登录", "", "", "", "", | |
| empty_q, empty_q, empty_q, empty_q, empty_q, empty_q, "", "") | |
| tid = state["current_task_id"] | |
| task = MBENCH_A_TASK_BY_ID[tid] | |
| dimensions = task["dimensions"] | |
| swapped = state["swapped"] | |
| m_a, m_b = task["model_a"], task["model_b"] | |
| # Map verdicts to winners | |
| verdicts = [q1_val, q2_val, q3_val, q4_val, q5_val, q6_val] | |
| dim_results = {} | |
| for i, dim_key in enumerate(dimensions): | |
| v = verdicts[i] | |
| if v == "A更好": | |
| # A is left; if swapped, left is model_b | |
| winner = m_b if swapped else m_a | |
| elif v == "B更好": | |
| winner = m_a if swapped else m_b | |
| else: | |
| winner = "tie" | |
| dim_results[dim_key] = winner | |
| record = { | |
| "type": "pairwise_mbench_a", | |
| "timestamp": time.time(), | |
| "annotator": state["annotator"], | |
| "task_id": tid, | |
| "subset": task["subset"], | |
| "sample_id": task["sample_id"], | |
| "camera_motion": task.get("camera_motion", "left_then_right"), | |
| "model_a": m_a, | |
| "model_b": m_b, | |
| "dimensions": dim_results, | |
| "swapped": swapped, | |
| "note": (note or "").strip(), | |
| } | |
| _append_annotation(record, ANN_FILE_MBENCH_A) | |
| with STATE_LOCK: | |
| MBENCH_A_PENDING.pop(tid, None) | |
| MBENCH_A_COMPLETED[tid].append(state["annotator"]) | |
| state["count"] = state.get("count", 0) + 1 | |
| state["idx"] = state["idx"] + 1 | |
| state["current_task_id"] = None | |
| return _mbench_a_next(state) | |
| def mbench_a_skip(state): | |
| """Skip current MBench-A task.""" | |
| if not state or not state.get("current_task_id"): | |
| empty_q = gr.update(visible=False, value="差不多") | |
| return (state, "⚠️ 请先登录", "", "", "", "", | |
| empty_q, empty_q, empty_q, empty_q, empty_q, empty_q, "", "") | |
| tid = state["current_task_id"] | |
| with STATE_LOCK: | |
| MBENCH_A_PENDING.pop(tid, None) | |
| state["idx"] = state["idx"] + 1 | |
| state["current_task_id"] = None | |
| return _mbench_a_next(state) | |
| # --------------------------------------------------------------------------- | |
| # UI | |
| # --------------------------------------------------------------------------- | |
| CUSTOM_CSS = """ | |
| #prompt_box textarea { height: 300px !important; overflow-y: auto !important; } | |
| .video-pair { display: flex; gap: 12px; } | |
| .video-pair > div { flex: 1; } | |
| /* Force aux info box to be visible regardless of Gradio theme */ | |
| .aux-info-box { | |
| background: #e3e8ef !important; | |
| color: #111 !important; | |
| padding: 14px !important; | |
| border-radius: 8px !important; | |
| margin-bottom: 12px !important; | |
| border: 1px solid #b0b8c4 !important; | |
| } | |
| .aux-info-box * { | |
| color: #111 !important; | |
| } | |
| .aux-info-box img { | |
| border: 1px solid #999; | |
| border-radius: 4px; | |
| } | |
| """ | |
| with gr.Blocks(title="MBench 标注", theme=gr.themes.Soft(), css=CUSTOM_CSS) as demo: | |
| gr.Markdown("# 🎬 MBench 视频标注平台") | |
| with gr.Tabs(): | |
| # ═══════════════ MBench-A Pairwise ═══════════════ | |
| with gr.Tab("MBench-A 对比 (World Models)"): | |
| gr.Markdown( | |
| "## 🌍 MBench-A — 世界模型记忆能力评测\n\n" | |
| "比较两个世界模型生成的长视频(~25 秒),评估相机转走再转回来后的记忆一致性。\n\n" | |
| "**视频 A/B 的模型身份已匿名随机分配。请对每个维度独立判断。**" | |
| ) | |
| a_stats = gr.Markdown("") | |
| a_state = gr.State({}) | |
| with gr.Row(): | |
| a_name = gr.Textbox(label="标注员名字", placeholder="例如: charlie", scale=4) | |
| a_login = gr.Button("开始标注", variant="primary", scale=1) | |
| a_status = gr.Markdown("") | |
| # Auxiliary info (mask image / camera GIF + caption / instructions) | |
| a_aux = gr.HTML("") | |
| # Video pair | |
| with gr.Row(equal_height=True): | |
| with gr.Column(scale=1, min_width=360): | |
| gr.Markdown("### 视频 A") | |
| a_video_left = gr.HTML("<p>请先登录。</p>") | |
| with gr.Column(scale=1, min_width=360): | |
| gr.Markdown("### 视频 B") | |
| a_video_right = gr.HTML("<p>请先登录。</p>") | |
| # Task info | |
| a_meta = gr.Markdown("") | |
| # Multi-dimension questions (max 6, dynamically shown/hidden) | |
| gr.Markdown("---\n### 请对以下每个维度分别判断:") | |
| a_q1 = gr.Radio(["A更好", "差不多", "B更好"], value="差不多", label="维度 1", visible=False) | |
| a_q2 = gr.Radio(["A更好", "差不多", "B更好"], value="差不多", label="维度 2", visible=False) | |
| a_q3 = gr.Radio(["A更好", "差不多", "B更好"], value="差不多", label="维度 3", visible=False) | |
| a_q4 = gr.Radio(["A更好", "差不多", "B更好"], value="差不多", label="维度 4", visible=False) | |
| a_q5 = gr.Radio(["A更好", "差不多", "B更好"], value="差不多", label="维度 5", visible=False) | |
| a_q6 = gr.Radio(["A更好", "差不多", "B更好"], value="差不多", label="维度 6", visible=False) | |
| a_note = gr.Textbox(label="备注(可选)", lines=1) | |
| with gr.Row(): | |
| a_submit = gr.Button("✅ 提交并下一组", variant="primary") | |
| a_skip = gr.Button("⏭️ 跳过") | |
| # Wiring | |
| a_all_outs = [a_state, a_status, a_aux, a_video_left, a_video_right, a_meta, | |
| a_q1, a_q2, a_q3, a_q4, a_q5, a_q6, a_note, a_stats] | |
| a_login.click(mbench_a_start, [a_name, a_state], a_all_outs) | |
| a_name.submit(mbench_a_start, [a_name, a_state], a_all_outs) | |
| a_submit.click(mbench_a_submit, | |
| [a_state, a_q1, a_q2, a_q3, a_q4, a_q5, a_q6, a_note], a_all_outs) | |
| a_skip.click(mbench_a_skip, [a_state], a_all_outs) | |
| # --------------------------------------------------------------------------- | |
| # Video proxy | |
| # --------------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| import httpx | |
| from fastapi import HTTPException, Request | |
| from fastapi.responses import StreamingResponse | |
| from gradio.routes import App as _GradioApp | |
| _video_client = httpx.AsyncClient(timeout=30.0, follow_redirects=True) | |
| async def _do_proxy(upstream: str, request: Request): | |
| """Generic proxy for HF video/asset URLs.""" | |
| req_headers = {} | |
| if (rng := request.headers.get("range")): | |
| req_headers["range"] = rng | |
| try: | |
| upstream_resp = await _video_client.send( | |
| _video_client.build_request("GET", upstream, headers=req_headers), | |
| stream=True, | |
| ) | |
| except Exception as e: | |
| raise HTTPException(502, f"upstream fetch failed: {e}") | |
| passthrough_headers = {} | |
| for h in ("content-type", "content-length", "accept-ranges", | |
| "content-range", "etag", "last-modified"): | |
| if h in upstream_resp.headers: | |
| passthrough_headers[h] = upstream_resp.headers[h] | |
| passthrough_headers.setdefault("content-type", "video/mp4") | |
| passthrough_headers["cache-control"] = "public, max-age=300" | |
| async def _body(): | |
| try: | |
| async for chunk in upstream_resp.aiter_bytes(chunk_size=65536): | |
| yield chunk | |
| finally: | |
| await upstream_resp.aclose() | |
| return StreamingResponse(_body(), status_code=upstream_resp.status_code, headers=passthrough_headers) | |
| async def _proxy_video(model: str, task_id: str, request: Request): | |
| """Proxy MBench-V videos.""" | |
| if model not in MODELS or task_id not in TASK_BY_ID: | |
| raise HTTPException(404, "unknown (model, task_id)") | |
| upstream = _hf_video_url(model, task_id) | |
| return await _do_proxy(upstream, request) | |
| async def _proxy_mbench_a_video(model: str, category: str, sample_id: str, request: Request): | |
| """Proxy MBench-A videos.""" | |
| if model not in MBENCH_A_MODELS: | |
| raise HTTPException(404, f"unknown model: {model}") | |
| upstream = _mbench_a_hf_video_url(model, category, sample_id) | |
| return await _do_proxy(upstream, request) | |
| _orig_create_app = _GradioApp.create_app | |
| def _patched_create_app(*args, **kwargs): | |
| app = _orig_create_app(*args, **kwargs) | |
| # MBench-V video proxy | |
| app.add_api_route( | |
| "/video/{model}/{task_id}.mp4", | |
| _proxy_video, | |
| methods=["GET", "HEAD"], | |
| include_in_schema=False, | |
| ) | |
| # MBench-A video proxy | |
| app.add_api_route( | |
| "/video_a/{model}/{category}/{sample_id}/left_then_right.mp4", | |
| _proxy_mbench_a_video, | |
| methods=["GET", "HEAD"], | |
| include_in_schema=False, | |
| ) | |
| print("[mbench-ann] video proxy routes registered (MBench-V + MBench-A)") | |
| return app | |
| _GradioApp.create_app = staticmethod(_patched_create_app) | |
| demo.queue(default_concurrency_limit=16).launch(ssr_mode=False) | |