"""OneVision Encoder Codec View. A simplified, dependency-light port of the codec_tools pipeline from lmms-eval-ov2. The original tool relies on a bitcost-patched ffmpeg 5.1 to score every macroblock by its actual encoding bit cost; we approximate that saliency signal with a Sobel gradient magnitude per patch (high gradient = high local complexity = roughly what the encoder would spend bits on). Pipeline (mirrors codec_tools/pipeline/process_video_bitcost_readiness.py): 1. Uniformly sample N frames from the input video. 2. smart_resize each frame so dims are multiples of `patch` and the total pixel count <= max_pixels. 3. Slice every frame into a patch grid; score each patch by its Sobel gradient magnitude mean. 4. Pick the top-K highest-scoring patches under the selected GOP grouping. 5. Render a "selection visualization" video: kept patches stay in full color, dropped patches are faded to a gray-white wash so the viewer can see exactly which patches the codec stage chose. 6. Pack one canvas per GOP group: the first frame of each group is kept whole as the I-frame, and later frames contribute only their selected patches packed below it in time order. """ import json import math import os import shutil import subprocess import tempfile import time from typing import List, Tuple import cv2 import gradio as gr import imageio_ffmpeg import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import numpy as np PATCH_CHOICES = [14, 16, 28] PATCH_OUTLINE_OUTER_BGR = (42, 23, 15) # dark slate PATCH_OUTLINE_INNER_BGR = (11, 158, 245) # amber DEMO_VIDEO_PATH = os.path.join( os.path.dirname(os.path.abspath(__file__)), "examples", "demo_codec_heatmap.mp4", ) DEMO_PRESET = ( DEMO_VIDEO_PATH, # video_in 32, # sample_frames 14, # patch_size 1024, # total_patches 150000, # max_pixels "sbs", # viz_mode 0.55, # heatmap_alpha 0.0, 0.0, # start_sec, end_sec "combined", # saliency_signal True, # score_log_scale 96.0, # bitcost_pct 0.55, # fade_strength "dynamic", # gop ) def smart_resize(frame: np.ndarray, max_pixels: int, factor: int) -> np.ndarray: """Resize so h,w are multiples of `factor` and h*w <= max_pixels.""" h, w = frame.shape[:2] pixels = h * w if pixels > max_pixels: scale = math.sqrt(max_pixels / pixels) h = max(factor, int(h * scale)) w = max(factor, int(w * scale)) h = max(factor, (h // factor) * factor) w = max(factor, (w // factor) * factor) return cv2.resize(frame, (w, h), interpolation=cv2.INTER_AREA) def sample_frame_ids(total: int, n: int) -> List[int]: if total <= 0: return [] if n >= total: return list(range(total)) return [int(round(i)) for i in np.linspace(0, total - 1, n)] def split_budget_evenly(total_k: int, n_parts: int) -> List[int]: total = max(0, int(total_k)) n = max(0, int(n_parts)) if n == 0: return [] base, rem = divmod(total, n) return [base + (1 if i < rem else 0) for i in range(n)] def sample_window_frame_ids(start: int, end: int, n: int) -> List[int]: start_i = int(start) end_i = int(end) count = max(0, int(n)) if end_i < start_i or count <= 0: return [] total = end_i - start_i + 1 if count >= total: return list(range(start_i, end_i + 1)) return [start_i + x for x in sample_frame_ids(total, count)] def decode_frames(video_path: str, frame_ids: List[int]) -> List[np.ndarray]: cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return [] frames: List[np.ndarray] = [] for fid in frame_ids: cap.set(cv2.CAP_PROP_POS_FRAMES, int(fid)) ok, fr = cap.read() if ok: frames.append(fr) cap.release() return frames def video_metadata(video_path: str) -> dict: cap = cv2.VideoCapture(video_path) total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = float(cap.get(cv2.CAP_PROP_FPS) or 0.0) w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) cap.release() meta = { "total_frames": total, "fps": round(fps, 3), "width": w, "height": h, } if shutil.which("ffprobe"): try: r = subprocess.run( [ "ffprobe", "-v", "quiet", "-select_streams", "v:0", "-show_entries", "stream=codec_name,bit_rate,pix_fmt,profile", "-of", "json", video_path, ], capture_output=True, text=True, check=True, timeout=15, ) data = json.loads(r.stdout).get("streams", [{}])[0] meta["codec"] = data.get("codec_name") meta["pix_fmt"] = data.get("pix_fmt") meta["profile"] = data.get("profile") meta["bitrate_bps"] = data.get("bit_rate") except Exception as e: meta["ffprobe_error"] = str(e) return meta def patch_score_grid(frame_bgr: np.ndarray, patch: int) -> np.ndarray: """Return [hb, wb] grid of Sobel gradient magnitude means per patch.""" gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY).astype(np.float32) gx = cv2.Sobel(gray, cv2.CV_32F, 1, 0, ksize=3) gy = cv2.Sobel(gray, cv2.CV_32F, 0, 1, ksize=3) mag = np.sqrt(gx * gx + gy * gy) h, w = mag.shape hb, wb = h // patch, w // patch mag = mag[: hb * patch, : wb * patch] grid = mag.reshape(hb, patch, wb, patch).mean(axis=(1, 3)) return grid.astype(np.float32) def patch_score_frame_diff( prev_bgr: np.ndarray, cur_bgr: np.ndarray, patch: int, ) -> np.ndarray: """Inter-frame absdiff per patch — proxy for motion / temporal complexity.""" if prev_bgr is None or prev_bgr.shape != cur_bgr.shape: return patch_score_grid(cur_bgr, patch) diff = cv2.absdiff(prev_bgr, cur_bgr).mean(axis=2).astype(np.float32) h, w = diff.shape hb, wb = h // patch, w // patch diff = diff[: hb * patch, : wb * patch] return diff.reshape(hb, patch, wb, patch).mean(axis=(1, 3)) def compute_score_grids( frames: List[np.ndarray], patch: int, signal: str, ) -> List[np.ndarray]: """Build per-frame patch score grids from one of three signals: - 'gradient' — Sobel magnitude only (intra-frame complexity) - 'frame_diff' — absdiff vs previous frame (temporal motion) - 'combined' — 0.5 * gradient_norm + 0.5 * frame_diff_norm For 'combined', each component is independently shifted to [0,1] across the whole sample so they contribute on equal footing.""" sig = (signal or "gradient").lower() if sig == "gradient": return [patch_score_grid(f, patch) for f in frames] if sig == "frame_diff": out = [] prev = None for f in frames: out.append(patch_score_frame_diff(prev, f, patch)) prev = f return out # combined g = np.stack([patch_score_grid(f, patch) for f in frames], axis=0) d_list = [] prev = None for f in frames: d_list.append(patch_score_frame_diff(prev, f, patch)) prev = f d = np.stack(d_list, axis=0) def _norm01(a: np.ndarray) -> np.ndarray: a = a.astype(np.float32) - a.min() m = a.max() return a / m if m > 1e-8 else a combined = 0.5 * _norm01(g) + 0.5 * _norm01(d) return [combined[i] for i in range(combined.shape[0])] def topk_mask(score: np.ndarray, k: int) -> np.ndarray: """Per-frame top-K mask (legacy helper, no longer used by process()).""" flat = score.flatten() if k >= flat.size: return np.ones_like(score, dtype=np.uint8) if k <= 0: return np.zeros_like(score, dtype=np.uint8) out = np.zeros(flat.size, dtype=np.uint8) keep_idx = np.argpartition(flat, -k)[-k:] out[keep_idx] = 1 return out.reshape(score.shape) def global_topk_masks( grids: List[np.ndarray], total_k: int, ) -> Tuple[List[np.ndarray], int]: """Pick the top `total_k` highest-scoring patches GLOBALLY across all sampled frames, return one mask per frame plus the actual count. Some frames may end up with zero patches (low energy throughout) while others may contribute many — that's the whole point: the codec-style saliency lets the budget concentrate where it matters.""" if not grids: return [], 0 arr = np.stack(grids, axis=0).astype(np.float32) # [N, hb, wb] N, hb, wb = arr.shape flat = arr.reshape(-1) k = int(total_k) if k >= flat.size: masks = [np.ones((hb, wb), dtype=np.uint8) for _ in range(N)] return masks, int(flat.size) if k <= 0: return [np.zeros((hb, wb), dtype=np.uint8) for _ in range(N)], 0 mask_flat = np.zeros(flat.size, dtype=np.uint8) keep_idx = np.argpartition(flat, -k)[-k:] mask_flat[keep_idx] = 1 bool_mask = mask_flat.reshape(N, hb, wb) return [bool_mask[i].astype(np.uint8) for i in range(N)], k def build_dynamic_groups( grids: List[np.ndarray], min_group_frames: int = 8, max_group_frames: int = 64, preferred_group_frames: int = 32, ) -> List[Tuple[int, int]]: """Adaptive temporal grouping by cumulative saliency energy. Groups are energy-adaptive, but constrained to a practical codec-stream range: by default each group spans roughly 8-64 sampled frames, with a preference around 32 frames/group. Each group later becomes exactly one IPPP canvas whose first frame is kept whole as the I-frame.""" n = len(grids) if n == 0: return [] min_len = max(1, int(min_group_frames)) max_len = max(min_len, int(max_group_frames)) preferred = min(max_len, max(min_len, int(preferred_group_frames))) if n <= max_len: return [(0, n - 1)] min_groups = max(1, math.ceil(n / max_len)) max_groups = max(1, n // min_len) target_groups = max(1, math.ceil(n / preferred)) target_groups = min(max(target_groups, min_groups), max_groups) if target_groups <= 1: return [(0, n - 1)] energies = np.array([float(g.sum()) for g in grids], dtype=np.float64) total = energies.sum() if total <= 1e-8: # Degenerate: pure even split, still respecting the group-size range. size = max(min_len, min(max_len, math.ceil(n / target_groups))) groups: List[Tuple[int, int]] = [] cursor = 0 while cursor < n and len(groups) < target_groups: end = min(n - 1, cursor + size - 1) if len(groups) == target_groups - 1: end = n - 1 groups.append((cursor, end)) cursor = end + 1 return groups target_per_group = total / target_groups groups = [] start = 0 cum = 0.0 for i in range(n): cum += energies[i] group_len = i - start + 1 groups_left = target_groups - len(groups) - 1 frames_left_after = n - i - 1 min_room_ok = frames_left_after >= groups_left * min_len threshold_hit = cum >= target_per_group and group_len >= min_len force_close = group_len >= max_len if len(groups) < target_groups - 1 and min_room_ok and (threshold_hit or force_close): groups.append((start, i)) start = i + 1 cum = 0.0 if start <= n - 1: groups.append((start, n - 1)) return groups def grouped_topk_masks( grids: List[np.ndarray], total_k: int, gop: str, ) -> Tuple[List[np.ndarray], int, List[Tuple[int, int]], str]: """Select patches under a GOP grouping strategy. GOP modes: - "global": one big group across the whole video — top-K global. - "" (e.g. "4"/"8"/"16"): fixed group size in frames; the budget is split equally across groups, top-K picked within each. - "dynamic": codec-stream-style adaptive groups (see build_dynamic_groups), defaulting to roughly 8-64 frames/group. Returns (per-frame masks, actual selected count, [(start,end),...] groups, resolved_label). """ n = len(grids) if n == 0: return [], 0, [], gop mode = (gop or "global").strip().lower() if mode in ("global", "none", "0", ""): masks, actual = global_topk_masks(grids, int(total_k)) return masks, actual, [(0, n - 1)], "global" if mode == "dynamic": groups = build_dynamic_groups(grids) resolved_label = "codec-stream" else: try: g_size = max(1, int(mode)) except ValueError: g_size = n groups = [] cursor = 0 while cursor < n: end = min(n - 1, cursor + g_size - 1) groups.append((cursor, end)) cursor = end + 1 resolved_label = mode num_groups = max(1, len(groups)) target_k = max(0, int(total_k)) capacities = [ sum(int(g.size) for g in grids[s:e + 1]) for (s, e) in groups ] alloc = split_budget_evenly(target_k, num_groups) leftover = 0 for i, cap in enumerate(capacities): if alloc[i] > cap: leftover += alloc[i] - cap alloc[i] = cap while leftover > 0: progressed = False for i, cap in enumerate(capacities): if alloc[i] < cap and leftover > 0: alloc[i] += 1 leftover -= 1 progressed = True if not progressed: break # Initialize empty masks, then fill per-group selections. out_masks = [np.zeros(g.shape, dtype=np.uint8) for g in grids] actual_total = 0 for (s, e), group_k in zip(groups, alloc): sub = grids[s:e + 1] sub_masks, sub_actual = global_topk_masks(sub, group_k) for i, sm in enumerate(sub_masks): out_masks[s + i] = sm actual_total += sub_actual return out_masks, actual_total, groups, resolved_label def faded_background(frame_bgr: np.ndarray, fade: float = 0.55) -> np.ndarray: """Convert to gray-white wash: gray * (1-fade) + white * fade.""" gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) gray_bgr = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR).astype(np.float32) white = np.full_like(gray_bgr, 255.0) out = gray_bgr * (1.0 - fade) + white * fade return out.astype(np.uint8) def overlay_selection( frame_bgr: np.ndarray, mask_grid: np.ndarray, patch: int, outline: bool = True, fade: float = 0.55, ) -> np.ndarray: """Composite: kept patches keep color; dropped patches become gray-white. Optionally draw a thin outline around kept patches.""" h, w = frame_bgr.shape[:2] hb, wb = mask_grid.shape pix_mask = np.kron(mask_grid, np.ones((patch, patch), dtype=np.uint8)) pix_mask = pix_mask[:h, :w] bg = faded_background(frame_bgr, fade=float(fade)) keep = pix_mask.astype(bool)[..., None] out = np.where(keep, frame_bgr, bg) if outline: # A two-tone stroke survives browser H.264/yuv420p encoding better # than a single 1 px saturated line. outer_thickness = 2 if patch >= 20 else 1 for i in range(hb): for j in range(wb): if mask_grid[i, j]: y0, x0 = i * patch, j * patch y1, x1 = y0 + patch - 1, x0 + patch - 1 cv2.rectangle( out, (x0, y0), (x1, y1), PATCH_OUTLINE_OUTER_BGR, outer_thickness, lineType=cv2.LINE_AA, ) if patch >= 6 and (x1 - x0) >= 3 and (y1 - y0) >= 3: cv2.rectangle( out, (x0 + 1, y0 + 1), (x1 - 1, y1 - 1), PATCH_OUTLINE_INNER_BGR, 1, lineType=cv2.LINE_AA, ) return out def _normalize_scores(grids: List[np.ndarray], pct: float = 99.0) -> np.ndarray: """Stack into [N, hb, wb], shift by per-video min, divide by global pct. Using the percentile (instead of max) suppresses outlier patches the same way codec_tools does with bitcost_pct=99.""" arr = np.stack(grids, axis=0).astype(np.float32) arr = arr - arr.min() cap = np.percentile(arr, pct) if arr.size else 1.0 if cap <= 1e-8: cap = float(arr.max() or 1.0) arr = np.clip(arr / cap, 0.0, 1.0) return arr def overlay_heatmap( frame_bgr: np.ndarray, score_grid: np.ndarray, patch: int, alpha: float = 0.55, ) -> np.ndarray: """Render a continuous JET heatmap of patch scores blended over the frame. Low score = blue, high score = red. `score_grid` is in [0, 1].""" h, w = frame_bgr.shape[:2] score = (np.clip(score_grid, 0.0, 1.0) * 255.0).astype(np.uint8) pix = np.kron(score, np.ones((patch, patch), dtype=np.uint8)) pix = pix[:h, :w] heat = cv2.applyColorMap(pix, cv2.COLORMAP_JET) out = cv2.addWeighted(frame_bgr, 1.0 - alpha, heat, alpha, 0.0) return out def overlay_sbs( frame_bgr: np.ndarray, mask_grid: np.ndarray, score_grid: np.ndarray, patch: int, alpha: float = 0.55, fade: float = 0.55, ) -> np.ndarray: """Side-by-side: [selection | heatmap] with a thin separator.""" left = overlay_selection(frame_bgr, mask_grid, patch, outline=True, fade=fade) right = overlay_heatmap(frame_bgr, score_grid, patch, alpha=alpha) h, w = left.shape[:2] sep = np.full((h, 4, 3), 30, dtype=np.uint8) sbs = np.concatenate([left, sep, right], axis=1) cv2.putText(sbs, "selection", (8, 22), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2, cv2.LINE_AA) cv2.putText(sbs, "heatmap", (w + 12, 22), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2, cv2.LINE_AA) return sbs def write_mp4(frames: List[np.ndarray], path: str, fps: float) -> None: """Write H.264 mp4 via imageio-ffmpeg's bundled ffmpeg (browser-friendly).""" if not frames: raise ValueError("no frames to write") h, w = frames[0].shape[:2] ff = imageio_ffmpeg.get_ffmpeg_exe() cmd = [ ff, "-y", "-loglevel", "error", "-f", "rawvideo", "-vcodec", "rawvideo", "-s", f"{w}x{h}", "-pix_fmt", "bgr24", "-r", f"{fps:.3f}", "-i", "-", "-an", "-vcodec", "libx264", "-pix_fmt", "yuv420p", "-preset", "veryfast", "-crf", "23", "-movflags", "+faststart", path, ] proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE) try: for f in frames: if f.shape[0] % 2 or f.shape[1] % 2: f = f[: f.shape[0] // 2 * 2, : f.shape[1] // 2 * 2] proc.stdin.write(np.ascontiguousarray(f).tobytes()) proc.stdin.close() err = proc.stderr.read().decode("utf-8", errors="ignore") rc = proc.wait() if rc != 0: raise RuntimeError(f"ffmpeg failed (rc={rc}): {err}") finally: if proc.poll() is None: proc.kill() def _build_ippp_canvas( frames: List[np.ndarray], masks: List[np.ndarray], i_idx: int, p_range: range, patch: int, ) -> Tuple[np.ndarray, int]: """Build one GOP canvas with explicit I/P sections. Layout: 1. The group's first frame is copied whole as the I-frame. 2. Each later P-frame gets its own packed section below, in time order. So GOP=4 becomes I|P|P|P, GOP=5 becomes I|P|P|P|P, etc. Returns (canvas, n_patches) where n_patches is the number of selected P-frame patches packed under the I-frame.""" i_frame = frames[i_idx] h, w = i_frame.shape[:2] hb, wb = h // patch, w // patch frame_h, frame_w = hb * patch, wb * patch i_crop = i_frame[:frame_h, :frame_w].copy() divider_h = 2 p_sections: List[np.ndarray] = [] n_patches = 0 for k in p_range: if k >= len(frames): break f, m = frames[k], masks[k] packed_patches: List[np.ndarray] = [] for i in range(m.shape[0]): for j in range(m.shape[1]): if m[i, j]: packed_patches.append( f[ i * patch:(i + 1) * patch, j * patch:(j + 1) * patch, ].copy() ) n_patches += len(packed_patches) packed_rows = max(1, int(math.ceil(len(packed_patches) / max(1, wb)))) packed_h = packed_rows * patch section_bg = np.full((packed_h, frame_w, 3), 246, dtype=np.uint8) for idx, tile in enumerate(packed_patches): row = idx // wb col = idx % wb y0 = row * patch x0 = col * patch section_bg[y0:y0 + patch, x0:x0 + patch] = tile p_sections.append(section_bg) total_h = frame_h + sum(divider_h + sec.shape[0] for sec in p_sections) canvas = np.full((total_h, frame_w, 3), 250, dtype=np.uint8) canvas[:frame_h, :frame_w] = i_crop y = frame_h for section in p_sections: canvas[y:y + divider_h, :] = (99, 102, 241) y += divider_h sec_h = section.shape[0] canvas[y:y + sec_h, :frame_w] = section y += sec_h return canvas, n_patches def pack_canvases_per_group( frames: List[np.ndarray], masks: List[np.ndarray], groups: List[Tuple[int, int]], patch: int, target_canvases: int = 1, ) -> Tuple[List[np.ndarray], List[Tuple[int, int, int]], int]: """Pack exactly one IPPP canvas per GOP group. Each group's first frame is kept whole as the I-frame, and every later frame gets its own packed P section below it. `target_canvases` is kept only for API compatibility and is ignored. Returns: canvases — list of np.ndarray, length == number of groups. sub_ranges — list of (group_idx, sub_start, sub_end) parallel to canvases, for caption / debugging. total_selected — I-frame patches (counted as full grid) + P-frame selected patches across all canvases. """ canvases: List[np.ndarray] = [] sub_ranges: List[Tuple[int, int, int]] = [] total_selected = 0 if not groups or not frames: return [np.full((patch, patch, 3), 255, dtype=np.uint8)], [(0, 0, 0)], 0 for g_idx, (s, e) in enumerate(groups): if s >= len(frames): continue ss, ee = s, e canvas, n_patches = _build_ippp_canvas( frames, masks, i_idx=ss, p_range=range(ss + 1, ee + 1), patch=patch, ) canvases.append(canvas) sub_ranges.append((g_idx, ss, ee)) hb, wb = frames[ss].shape[0] // patch, frames[ss].shape[1] // patch total_selected += hb * wb + n_patches if not canvases: canvases = [np.full((patch, patch, 3), 255, dtype=np.uint8)] sub_ranges = [(0, 0, 0)] return canvases, sub_ranges, total_selected def make_charts( grids: List[np.ndarray], masks: List[np.ndarray], codec_frame_ids: List[int], uniform_frame_ids: List[int], uniform_requested_frames: int, uniform_full_frame_patches: int, fps: float, total_duration_sec: float, total_patches_budget: int, saliency_signal: str, groups: List[Tuple[int, int]] = None, gop_label: str = "global", ): """One overlaid step chart: cumulative patches selected vs time, for the codec saliency curve and a uniform full-frame sampling baseline. X = time (s) Y = cumulative count of selected patches The codec curve rises in bursts where saliency is high; the uniform baseline rises in equal steps because every sampled full frame contributes `patch_size^2` patches.""" # Use a larger physical canvas plus high DPI so Gradio/HF does not # upscale a small rasterized plot into a blurry chart. fig, ax = plt.subplots( figsize=(11.4, 4.8), dpi=220, constrained_layout=True, ) fps_safe = float(fps) if fps and fps > 0 else 25.0 if grids: hb, wb = grids[0].shape else: hb = wb = 1 grid_size = hb * wb all_frame_ids = list(codec_frame_ids) + list(uniform_frame_ids) duration = float(total_duration_sec) if total_duration_sec and total_duration_sec > 0 else ( (max(all_frame_ids) / fps_safe) if all_frame_ids else 1.0 ) # ─── Build step curves ────────────────────────────────────────────── def _step(xs, cum): """Return (xx, yy) for a left-continuous step plot through (xs, cum).""" if not xs: return [0.0, duration], [0.0, 0.0] xx, yy = [0.0], [0.0] prev = 0.0 for x, c in zip(xs, cum): xx.extend([x, x]); yy.extend([prev, c]) prev = c xx.append(duration); yy.append(prev) return xx, yy times = [fid / fps_safe for fid in codec_frame_ids] counts = [int(m.sum()) for m in masks] codec_cum = list(np.cumsum(counts)) if counts else [] codec_total = int(codec_cum[-1]) if codec_cum else 0 xx_c, yy_c = _step(times, codec_cum) # Uniform baseline: evenly sample COMPLETE frames from the same time # window, no codec saliency involved. Each sampled frame contributes a # fixed `patch_size^2` patch cost, independent of visualization resize. budget_int = int(total_patches_budget) requested_uniform = max(0, int(uniform_requested_frames)) full_frame_patches = max(1, int(uniform_full_frame_patches)) n_uniform = len(uniform_frame_ids) uni_per_step = [full_frame_patches for _ in uniform_frame_ids] uni_cum = list(np.cumsum(uni_per_step)) if uni_per_step else [] uni_total = int(uni_cum[-1]) if uni_cum else 0 uni_times = [fid / fps_safe for fid in uniform_frame_ids] xx_u, yy_u = _step(uni_times, uni_cum) # ─── Plot ─────────────────────────────────────────────────────────── # Per-frame breakdown for the legend. if counts: c_min, c_max = int(min(counts)), int(max(counts)) c_avg = codec_total / max(1, len(counts)) codec_lbl = ( f"Codec · {saliency_signal} ({codec_total:,} total · " f"per-frame min {c_min} · avg {c_avg:.1f} · max {c_max})" ) else: codec_lbl = f"Codec · {saliency_signal} ({codec_total:,} patches)" if uni_per_step: unused = max(0, budget_int - uni_total) frame_part = ( f"{n_uniform}/{requested_uniform} frames fit budget" if requested_uniform != n_uniform else f"{n_uniform} frames" ) uni_lbl = ( f"Uniform full frames ({frame_part} · {full_frame_patches}/frame · " f"{uni_total:,} total" + (f" · {unused:,} budget unused" if unused else "") + ")" ) else: uni_lbl = ( f"Uniform full frames (0/{requested_uniform} frames fit budget " f"{budget_int:,}; need {full_frame_patches} patches/frame)" ) ax.fill_between(xx_c, yy_c, step=None, alpha=0.12, color="#4f46e5") ax.plot(xx_c, yy_c, color="#4f46e5", linewidth=2.8, label=codec_lbl) ax.fill_between(xx_u, yy_u, step=None, alpha=0.10, color="#06b6d4") ax.plot( xx_u, yy_u, color="#06b6d4", linewidth=2.8, linestyle="--", label=uni_lbl, ) # Budget reference line budget = int(total_patches_budget) ax.axhline(budget, color="#94a3b8", linestyle=":", linewidth=1.1, alpha=0.85) ax.text( duration * 0.995, budget * 1.015, f"budget {budget:,}", color="#475569", fontsize=10.0, va="bottom", ha="right", ) # Group boundaries if groups and len(groups) > 1 and times: for (_, end_idx) in groups[:-1]: if end_idx + 1 < len(times): bx = (times[end_idx] + times[end_idx + 1]) / 2.0 else: bx = times[end_idx] ax.axvline( bx, color="#cbd5e1", linestyle=(0, (3, 3)), alpha=0.55, linewidth=1.0, ) n_groups = len(groups) if groups else 1 gop_str = gop_label if gop_label in ("global", "codec-stream") else f"GOP={gop_label}" ax.set_title( f"Cumulative patches selected over time · {saliency_signal} · " f"{gop_str} ({n_groups} groups)", fontsize=13, color="#1e293b", ) ax.set_xlabel("time (s)", fontsize=11) ax.set_ylabel("# patches selected (cumulative)", fontsize=11) ax.set_xlim(-duration * 0.02, duration * 1.02) ymax = max(budget, codec_total, uni_total) * 1.08 + 1 ax.set_ylim(0, ymax) ax.tick_params(axis="both", labelsize=10) ax.grid(True, alpha=0.25, linestyle="--", axis="y") ax.spines[["top", "right"]].set_visible(False) ax.legend(loc="upper left", fontsize=10, frameon=False, handlelength=2.8) fig.patch.set_facecolor("white") return fig def process( video_path, sample_frames: int, patch_size: int, total_patches: int, max_pixels: int, viz_mode: str = "selection", heatmap_alpha: float = 0.55, start_sec: float = 0.0, end_sec: float = 0.0, saliency_signal: str = "gradient", score_log_scale: bool = False, bitcost_pct: float = 99.0, fade_strength: float = 0.55, gop: str = "global", target_canvases: int = 1, progress=gr.Progress(track_tqdm=False), ): if not video_path: return None, [], "Please upload a video.", None t0 = time.time() progress(0.05, desc="Reading metadata") meta = video_metadata(video_path) total = meta.get("total_frames") or 0 if total <= 0: return None, [], json.dumps( {"error": "Could not read frame count.", "metadata": meta}, indent=2, ensure_ascii=False, ), None progress(0.10, desc="Sampling frames") fps = float(meta.get("fps") or 0.0) s_sec = max(0.0, float(start_sec or 0.0)) e_sec = float(end_sec or 0.0) if fps > 0 and (s_sec > 0 or e_sec > 0): f_start = max(0, int(round(s_sec * fps))) f_end = ( min(total - 1, int(round(e_sec * fps)) - 1) if e_sec > 0 else total - 1 ) if f_end <= f_start: f_end = total - 1 window_total = f_end - f_start + 1 if int(sample_frames) >= window_total: fids = list(range(f_start, f_end + 1)) else: fids = [ int(round(x)) for x in np.linspace(f_start, f_end, int(sample_frames)) ] else: f_start, f_end = 0, total - 1 fids = sample_frame_ids(total, int(sample_frames)) raw = decode_frames(video_path, fids) if not raw: return None, [], json.dumps( {"error": "Failed to decode frames.", "metadata": meta}, indent=2, ensure_ascii=False, ), None progress(0.25, desc="smart_resize") resized = [smart_resize(f, int(max_pixels), int(patch_size)) for f in raw] th, tw = resized[0].shape[:2] resized = [ cv2.resize(f, (tw, th), interpolation=cv2.INTER_AREA) if f.shape[:2] != (th, tw) else f for f in resized ] progress(0.40, desc=f"Scoring patches ({saliency_signal})") grids = compute_score_grids(resized, int(patch_size), saliency_signal) if score_log_scale: grids = [np.log1p(np.clip(g, 0.0, None)) for g in grids] masks, actual_selected, groups, gop_resolved = grouped_topk_masks( grids, int(total_patches), str(gop or "global"), ) norm_scores = _normalize_scores(grids, pct=float(bitcost_pct)) mode = (viz_mode or "selection").lower() if mode not in ("selection", "heatmap", "sbs"): mode = "selection" progress(0.60, desc=f"Rendering {mode} video") if mode == "heatmap": vis = [ overlay_heatmap(f, s, int(patch_size), alpha=float(heatmap_alpha)) for f, s in zip(resized, norm_scores) ] elif mode == "sbs": vis = [ overlay_sbs( f, m, s, int(patch_size), alpha=float(heatmap_alpha), fade=float(fade_strength), ) for f, m, s in zip(resized, masks, norm_scores) ] else: vis = [ overlay_selection(f, m, int(patch_size), fade=float(fade_strength)) for f, m in zip(resized, masks) ] out_dir = tempfile.mkdtemp(prefix="codec_view_") vis_path = os.path.join(out_dir, f"{mode}_vis.mp4") vis_fps = max(2.0, min(8.0, (meta.get("fps") or 25.0) / 4.0)) write_mp4(vis, vis_path, vis_fps) progress(0.85, desc="Packing canvases (IPPP)") canvases, sub_ranges, n_selected = pack_canvases_per_group( resized, masks, groups, int(patch_size), target_canvases=1, ) canvas_items: List[Tuple[str, str]] = [] for idx, canv in enumerate(canvases): cp = os.path.join(out_dir, f"canvas_{idx:03d}.png") cv2.imwrite(cp, canv) g_idx, ss, ee = sub_ranges[idx] if idx < len(sub_ranges) else (0, idx, idx) src_start = int(fids[ss]) if ss < len(fids) else None src_end = int(fids[ee]) if ee < len(fids) else None p_frame_count = max(0, ee - ss) structure_label = " ".join(["I"] + ["P"] * p_frame_count) p_patch_count = int(sum(int(m.sum()) for m in masks[ss + 1:ee + 1])) caption = ( f"Canvas {idx + 1}/{len(canvases)} · group {g_idx + 1} · " f"{structure_label} · sampled #{ss}-{ee} · src {src_start}-{src_end} · " f"I src#{src_start} + {p_patch_count} P patches from " f"{p_frame_count} frame{'s' if p_frame_count != 1 else ''}" ) canvas_items.append((cp, caption)) hb, wb = grids[0].shape grid_size = int(grids[0].shape[0] * grids[0].shape[1]) if grids else 0 uniform_full_frame_patches = int(patch_size) * int(patch_size) # Uniform full-frame sampling baseline: evenly sample complete frames # from the same time window, independent of codec saliency. requested_budget = int(total_patches) uniform_requested_frames = len(fids) uniform_frame_count = min( uniform_requested_frames, requested_budget // max(1, uniform_full_frame_patches), ) uniform_frame_ids = sample_window_frame_ids(f_start, f_end, uniform_frame_count) uniform_total = int(len(uniform_frame_ids) * uniform_full_frame_patches) info = { "input": meta, "params": { "sample_frames": int(sample_frames), "patch_size": int(patch_size), "total_patches_budget": int(total_patches), "max_pixels": int(max_pixels), "start_sec": float(s_sec), "end_sec": float(e_sec) if e_sec > 0 else None, "saliency_signal": saliency_signal, "score_log_scale": bool(score_log_scale), "bitcost_pct": float(bitcost_pct), "fade_strength": float(fade_strength), "gop": gop_resolved, "canvas_policy": "one_canvas_per_group_with_per_frame_p_sections", "i_frame_policy": "first_frame_full_in_each_group", }, "gop_groups": [ { "start_frame_idx": int(s), "end_frame_idx": int(e), "start_sample_idx": int(s), "end_sample_idx": int(e), "start_source_frame_id": int(fids[s]) if s < len(fids) else None, "end_source_frame_id": int(fids[e]) if e < len(fids) else None, "source_frame_ids": [int(fids[i]) for i in range(s, e + 1)], "n_frames": int(e - s + 1), "structure_label": " ".join(["I"] + ["P"] * max(0, e - s)), "i_frame_source_id": int(fids[s]) if s < len(fids) else None, "p_source_frame_ids": [int(fids[i]) for i in range(s + 1, e + 1)], "p_frame_count": int(max(0, e - s)), "p_frame_patch_counts": [int(masks[i].sum()) for i in range(s + 1, e + 1)], "p_frame_selected_patches": int(sum(int(m.sum()) for m in masks[s + 1:e + 1])), "selected": int(sum(int(m.sum()) for m in masks[s:e + 1])), } for (s, e) in groups ], "frame_window": { "first_decoded": int(f_start), "last_decoded": int(f_end), "codec_frame_ids": [int(x) for x in fids], "uniform_full_frame_ids": [int(x) for x in uniform_frame_ids], }, "codec_per_frame_patches": [int(m.sum()) for m in masks], "uniform_baseline": { "mode": "uniform_full_frame_sampling", "requested_frames": int(uniform_requested_frames), "frames": int(len(uniform_frame_ids)), "patches_per_frame": int(uniform_full_frame_patches), "frame_ids": [int(x) for x in uniform_frame_ids], "requested_budget": requested_budget, "unused_budget": int(max(0, requested_budget - uniform_total)), "total_patches": uniform_total, "explanation": ( "Uniformly sample complete frames from the same time window. " f"The baseline targets the same sampled-frame count as codec " f"({uniform_requested_frames}), but each full frame costs " f"{uniform_full_frame_patches} patches (= patch_size^2), so " f"only {len(uniform_frame_ids)} full " "frames may fit inside the requested budget." ), }, "resized_frame_size": f"{tw}x{th}", "patch_grid_per_frame": f"{hb}x{wb} = {hb * wb} patches", "uniform_full_frame_patch_cost": int(uniform_full_frame_patches), "actual_selected_total": int(actual_selected), "total_selected_patches_incl_i_frames": int(n_selected), "canvases": [ { "index": i, "size": f"{canvases[i].shape[1]}x{canvases[i].shape[0]}", "group": int(sub_ranges[i][0]) if i < len(sub_ranges) else None, "sub_range": list(sub_ranges[i][1:3]) if i < len(sub_ranges) else None, "sampled_indices": ( [int(x) for x in range(sub_ranges[i][1], sub_ranges[i][2] + 1)] if i < len(sub_ranges) else [] ), "source_frame_ids": ( [int(fids[x]) for x in range(sub_ranges[i][1], sub_ranges[i][2] + 1)] if i < len(sub_ranges) else [] ), "structure_label": ( " ".join(["I"] + ["P"] * max(0, sub_ranges[i][2] - sub_ranges[i][1])) if i < len(sub_ranges) else "I" ), "i_frame_source_id": ( int(fids[sub_ranges[i][1]]) if i < len(sub_ranges) else None ), "p_source_frame_ids": ( [int(fids[x]) for x in range(sub_ranges[i][1] + 1, sub_ranges[i][2] + 1)] if i < len(sub_ranges) else [] ), "p_frame_count": ( int(max(0, sub_ranges[i][2] - sub_ranges[i][1])) if i < len(sub_ranges) else 0 ), "p_frame_patch_counts": ( [int(masks[x].sum()) for x in range(sub_ranges[i][1] + 1, sub_ranges[i][2] + 1)] if i < len(sub_ranges) else [] ), "p_frame_selected_patches": ( int(sum(int(m.sum()) for m in masks[sub_ranges[i][1] + 1:sub_ranges[i][2] + 1])) if i < len(sub_ranges) else 0 ), "structure": "Full I-frame on top; one packed P section per " "later frame, in time order.", } for i in range(len(canvases)) ], "n_canvases": int(len(canvases)), "vis_video_fps": round(vis_fps, 2), "viz_mode": mode, "heatmap_alpha": float(heatmap_alpha) if mode != "selection" else None, "score_normalization": f"shift-min, /p{bitcost_pct:.1f}, clip" + (" (log1p applied)" if score_log_scale else ""), "elapsed_sec": round(time.time() - t0, 2), } progress(0.95, desc="Building charts") duration_sec = (total / fps) if fps > 0 else 0.0 chart_fig = make_charts( grids, masks, fids, uniform_frame_ids, uniform_requested_frames, uniform_full_frame_patches, fps, duration_sec, int(total_patches), saliency_signal, groups=groups, gop_label=gop_resolved, ) progress(1.0, desc="Done") return ( vis_path, canvas_items, json.dumps(info, indent=2, ensure_ascii=False), chart_fig, ) CUSTOM_CSS = """ :root, .gradio-container, .gradio-container.dark { --ovc-grad: linear-gradient(135deg, #4f46e5 0%, #2563eb 50%, #06b6d4 100%); --ovc-grad-soft: linear-gradient(135deg, rgba(79,70,229,0.10), rgba(6,182,212,0.10)); --ovc-ring: rgba(79,70,229,0.18); --ovc-ring-strong: rgba(79,70,229,0.30); --ovc-line-soft: rgba(148,163,184,0.18); --ovc-line: rgba(148,163,184,0.26); --ovc-line-strong: rgba(100,116,139,0.38); --ovc-line-accent-soft: rgba(79,70,229,0.18); --ovc-line-accent: rgba(79,70,229,0.28); --ovc-line-accent-strong: rgba(79,70,229,0.40); --ovc-line-cyan: rgba(6,182,212,0.18); --ovc-surface-tint: rgba(248,250,252,0.74); --ovc-surface-accent: rgba(79,70,229,0.035); --ovc-shadow-soft: 0 1px 2px rgba(15,23,42,0.04), 0 10px 30px rgba(15,23,42,0.03); --ovc-shadow-accent: 0 8px 28px rgba(79,70,229,0.08); } .gradio-container { max-width: 1480px !important; margin: 0 auto !important; padding-left: 10px !important; padding-right: 10px !important; } .ovc-main { gap: 18px !important; align-items: flex-start !important; } .ovc-bottom { gap: 16px !important; align-items: stretch !important; } @keyframes ovc-shift { 0% { background-position: 0% 50%; } 50% { background-position: 100% 50%; } 100% { background-position: 0% 50%; } } @keyframes ovc-pulse { 0%, 100% { box-shadow: 0 6px 18px rgba(37, 99, 235, 0.32); } 50% { box-shadow: 0 8px 26px rgba(37, 99, 235, 0.50); } } @keyframes ovc-fade-in { from { opacity: 0; transform: translateY(4px); } to { opacity: 1; transform: translateY(0); } } /* Hero */ #ovc-hero { text-align: center; padding: 44px 16px 22px; border-radius: 22px; background: radial-gradient(120% 80% at 50% -10%, rgba(79,70,229,0.20), transparent 60%), linear-gradient(180deg, rgba(79,70,229,0.06), rgba(6,182,212,0.03)), repeating-linear-gradient(0deg, rgba(99,102,241,0.05) 0 1px, transparent 1px 28px), repeating-linear-gradient(90deg, rgba(99,102,241,0.05) 0 1px, transparent 1px 28px); border: 1px solid var(--ovc-line-accent-soft); box-shadow: 0 10px 30px rgba(15,23,42,0.04); margin-bottom: 18px; position: relative; overflow: hidden; } #ovc-hero::after { content: ""; position: absolute; inset: auto -20% -40% -20%; height: 60%; background: radial-gradient(60% 80% at 50% 0%, rgba(6,182,212,0.22), transparent 70%); pointer-events: none; } #ovc-hero h1 { font-size: 2.7rem; font-weight: 800; background: var(--ovc-grad); background-size: 200% 200%; animation: ovc-shift 9s ease-in-out infinite; -webkit-background-clip: text; background-clip: text; color: transparent; margin: 0 0 6px; letter-spacing: -0.028em; line-height: 1.04; } #ovc-hero p.tagline { font-size: 1.05rem; color: var(--body-text-color-subdued); margin: 0 auto 16px; max-width: 760px; line-height: 1.6; } .ovc-links { display: flex; flex-wrap: wrap; gap: 10px; justify-content: center; margin: 14px auto 6px; position: relative; z-index: 1; } .ovc-links a { text-decoration: none; font-weight: 600; font-size: 0.9rem; padding: 7px 14px; border-radius: 999px; background: var(--background-fill-primary, #fff); border: 1px solid var(--ovc-line-accent-soft); color: #4338ca; transition: transform 0.12s ease, box-shadow 0.18s ease, background 0.18s ease, color 0.18s ease, border-color 0.18s ease; display: inline-flex; align-items: center; box-shadow: 0 1px 2px rgba(15,23,42,0.04); } .ovc-links a:hover { background: var(--ovc-grad); color: #fff; border-color: transparent; transform: translateY(-1px); box-shadow: 0 8px 20px rgba(79,70,229,0.18); } .gradio-container.dark .ovc-links a { background: rgba(30,41,59,0.7); color: #c7d2fe; border-color: var(--ovc-line-accent-strong); } /* Cards */ .ovc-card { border-radius: 16px !important; padding: 16px 18px !important; border: 1px solid var(--ovc-line-soft) !important; background: var(--background-fill-primary) !important; box-shadow: var(--ovc-shadow-soft); transition: box-shadow 0.18s ease, border-color 0.18s ease, transform 0.18s ease; animation: ovc-fade-in 0.32s ease-out; } .ovc-card:hover { border-color: var(--ovc-line) !important; box-shadow: 0 8px 24px rgba(15,23,42,0.06); } .ovc-card + .ovc-card { margin-top: 2px; } /* Primary outputs: subtle accent ring + lift */ .ovc-card-primary { border: 1px solid var(--ovc-line-accent-soft) !important; background: linear-gradient(180deg, rgba(79,70,229,0.028), rgba(6,182,212,0.014)), var(--background-fill-primary) !important; box-shadow: inset 0 0 0 1px rgba(255,255,255,0.65), var(--ovc-shadow-accent) !important; } .ovc-card-primary:hover { border-color: var(--ovc-line-accent) !important; box-shadow: inset 0 0 0 1px rgba(255,255,255,0.72), 0 10px 32px rgba(79,70,229,0.10) !important; } .ovc-card h3 { display: inline-flex; align-items: center; gap: 8px; font-size: 0.74rem !important; font-weight: 700 !important; text-transform: uppercase; letter-spacing: 0.10em; color: #3730a3 !important; background: rgba(79,70,229,0.06); border: 1px solid rgba(79,70,229,0.10); padding: 4px 10px !important; border-radius: 999px; margin: 0 0 12px !important; } .ovc-card h3::before { content: ""; display: inline-block; width: 6px; height: 6px; border-radius: 50%; background: var(--ovc-grad); transform: translateY(0); } /* Run button */ #ovc-run button { width: 100%; height: 54px !important; font-size: 1.06rem !important; font-weight: 700 !important; letter-spacing: 0.01em; background: var(--ovc-grad) !important; background-size: 200% 200% !important; animation: ovc-shift 6s ease-in-out infinite, ovc-pulse 2.6s ease-in-out infinite; border: none !important; color: #fff !important; border-radius: 14px !important; transition: transform 0.06s ease; } #ovc-run button:hover { transform: translateY(-1px); animation-play-state: paused; } #ovc-run button:active { transform: translateY(0); } /* Preset buttons */ .ovc-preset button { background: var(--ovc-grad-soft) !important; color: #4338ca !important; border: 1px solid rgba(79,70,229,0.16) !important; border-radius: 10px !important; font-weight: 600 !important; transition: all 0.15s ease; } .ovc-preset button:hover { background: var(--ovc-grad) !important; color: #fff !important; border-color: transparent !important; } /* Footer */ #ovc-footer { text-align: center; color: var(--body-text-color-subdued); font-size: 0.80rem; padding: 22px 8px 10px; margin-top: 14px; border-top: 1px solid rgba(100,116,139,0.22); } #ovc-footer code { background: rgba(79,70,229,0.08); padding: 1px 6px; border-radius: 4px; } /* Tighter spacing for sliders inside cards */ .ovc-card .gradio-slider { margin-bottom: 4px !important; } .ovc-card .gradio-number, .ovc-card .gradio-radio, .ovc-card .gradio-checkbox, .ovc-card .gradio-code, .ovc-card .gradio-gallery, .ovc-card .gradio-video, .ovc-card .gradio-plot { width: 100% !important; } /* Tame Gradio's dark default placeholders inside our cards: blanket-override any background on the inner wrappers, then paint a brand-tinted gradient on the canonical containers. This lights up the empty Video/Image/Plot zones so they no longer look like black holes. */ .ovc-card .video-container, .ovc-card .image-container, .ovc-card .image-frame, .ovc-card .preview, .ovc-card .plot-container, .ovc-card .empty, .ovc-card video, .ovc-card [data-testid="video"], .ovc-card [data-testid="image"], .ovc-card .icon-button, .ovc-card .options, .ovc-card .source-selection, .ovc-card .upload-container { background: transparent !important; background-color: transparent !important; } .ovc-card .container, .ovc-card .wrap, .ovc-card .block, .ovc-card fieldset, .ovc-card [data-testid="block"], .ovc-card [data-testid="video"], .ovc-card [data-testid="image"], .ovc-card [data-testid="plot"], .ovc-card [data-testid="file-upload"], .ovc-card [data-testid="upload"], .ovc-card .video-container, .ovc-card .image-container, .ovc-card .plot-container { border-radius: 12px !important; } .ovc-card .block, .ovc-card fieldset, .ovc-card [data-testid="block"], .ovc-card [data-testid="video"], .ovc-card [data-testid="image"], .ovc-card [data-testid="plot"], .ovc-card [data-testid="file-upload"], .ovc-card [data-testid="upload"], .ovc-card .wrap, .ovc-card .container { border: 1px solid transparent !important; box-shadow: none !important; outline: none !important; background: transparent !important; } .ovc-card .video-container, .ovc-card .image-container, .ovc-card .plot-container, .ovc-card-primary .video-container, .ovc-card-primary .image-container, .ovc-card-primary .plot-container { background: linear-gradient(180deg, rgba(79,70,229,0.028), rgba(6,182,212,0.018)), var(--ovc-surface-tint) !important; border: 1px solid rgba(79,70,229,0.18) !important; box-shadow: inset 0 1px 0 rgba(255,255,255,0.68) !important; } .ovc-card .upload-container, .ovc-card .video-container, .ovc-card .image-container, .ovc-card .plot-container { width: 100% !important; min-width: 0 !important; } .ovc-card .upload-container, .ovc-card [data-testid="file-upload"] { min-height: 220px !important; background: linear-gradient(180deg, rgba(79,70,229,0.03), rgba(6,182,212,0.02)), rgba(248,250,252,0.72) !important; border: 1px solid rgba(79,70,229,0.16) !important; } .ovc-card .plot-container, .ovc-card [data-testid="plot"] { min-height: 300px !important; } .ovc-card-primary .video-container, .ovc-card [data-testid="video"] { min-height: 260px !important; } .ovc-card .gradio-video, .ovc-card .gradio-image, .ovc-card .gradio-plot { border-color: transparent !important; background: transparent !important; box-shadow: none !important; outline: none !important; } .ovc-card video, .ovc-card img, .ovc-card canvas, .ovc-card svg { border: none !important; outline: none !important; box-shadow: none !important; } /* Empty placeholder text inside Gradio components */ .ovc-card .empty, .ovc-card .empty p, .ovc-card .empty span { color: #94a3b8 !important; } /* Stats tile grid (rendered into a gr.HTML by render_stats_html) */ .ovc-stats { display: grid; grid-template-columns: repeat(auto-fit, minmax(150px, 1fr)); gap: 10px; } .ovc-stat { padding: 12px 14px; border-radius: 14px; background: linear-gradient(135deg, rgba(79,70,229,0.055), rgba(6,182,212,0.03)); border: 1px solid rgba(79,70,229,0.12); transition: transform 0.18s ease, box-shadow 0.18s ease; } .ovc-stat:hover { transform: translateY(-1px); box-shadow: 0 6px 18px rgba(79,70,229,0.10); } .ovc-stat .value { font-size: 1.55rem; font-weight: 800; background: var(--ovc-grad); -webkit-background-clip: text; background-clip: text; color: transparent; letter-spacing: -0.02em; line-height: 1.1; word-break: break-word; } .ovc-stat .label { font-size: 0.74rem; color: #64748b; text-transform: uppercase; letter-spacing: 0.06em; margin-top: 4px; font-weight: 600; } /* ─── Mobile / narrow viewport adjustments ─────────────────────────── */ @media (max-width: 768px) { .gradio-container { padding: 6px !important; } /* Force the controls/outputs row to stack vertically on phones */ .gradio-container .ovc-main { flex-direction: column !important; gap: 12px !important; } .gradio-container .ovc-bottom { flex-direction: column !important; gap: 12px !important; } .gradio-container .ovc-main > div { width: 100% !important; min-width: 0 !important; max-width: 100% !important; flex: 1 1 100% !important; } /* Hero scales down */ #ovc-hero { padding: 28px 14px 16px; border-radius: 16px; margin-bottom: 12px; } #ovc-hero h1 { font-size: 2.05rem; letter-spacing: -0.02em; } #ovc-hero p.tagline { font-size: 0.96rem; line-height: 1.5; margin-bottom: 12px; } .ovc-links { gap: 6px; margin-top: 10px; } .ovc-links a { font-size: 0.78rem; padding: 5px 10px; } /* Cards tighter */ .ovc-card { padding: 12px 14px !important; border-radius: 14px !important; } .ovc-card h3 { font-size: 0.70rem !important; margin-bottom: 8px !important; } /* Run button */ #ovc-run button { height: 48px !important; font-size: 0.98rem !important; } /* Stats tile sizing */ .ovc-stats { grid-template-columns: repeat(auto-fit, minmax(115px, 1fr)); gap: 8px; } .ovc-stat { padding: 10px 12px; } .ovc-stat .value { font-size: 1.25rem; } .ovc-stat .label { font-size: 0.68rem; } /* Outputs: shorter video so it does not dominate the screen */ .ovc-card video { max-height: 280px !important; } .ovc-card .upload-container, .ovc-card [data-testid="file-upload"] { min-height: 180px !important; } } @media (max-width: 480px) { #ovc-hero { padding: 22px 12px 14px; } #ovc-hero h1 { font-size: 1.7rem; } #ovc-hero p.tagline { font-size: 0.9rem; } /* Put each link on a row of two (browsers will pack 2 per row at this size) */ .ovc-links a { font-size: 0.74rem; padding: 4px 9px; } #ovc-run button { height: 46px !important; font-size: 0.94rem !important; } } """ THEME = gr.themes.Soft( primary_hue="indigo", secondary_hue="blue", neutral_hue="slate", font=[gr.themes.GoogleFont("Inter"), "system-ui", "sans-serif"], ).set( body_background_fill="*neutral_50", block_radius="14px", button_primary_background_fill="*primary_500", button_primary_background_fill_hover="*primary_600", ) HERO_HTML = """

OneVision Encoder

Codec-style patch saliency for video understanding — see which patches the encoder picks from your video and pack them into the canvas LLaVA-OneVision consumes.

""" try: _GR_MAJOR = int(gr.__version__.split(".")[0]) except Exception: _GR_MAJOR = 4 _BLOCK_KW: dict = {"title": "OneVision Encoder"} _LAUNCH_KW: dict = {} if _GR_MAJOR >= 6: # In Gradio 6.0 these moved off Blocks(...) onto launch(...). _LAUNCH_KW["theme"] = THEME _LAUNCH_KW["css"] = CUSTOM_CSS else: _BLOCK_KW["theme"] = THEME _BLOCK_KW["css"] = CUSTOM_CSS VIZ_CHOICES = [ ("Selection — kept patches in color, others fade to gray-white", "selection"), ("Heatmap — full-frame JET overlay (blue=low, red=high)", "heatmap"), ("Both", "sbs"), ] SIGNAL_CHOICES = [ ("Gradient — intra-frame Sobel (sharp edges, textures, text)", "gradient"), ("Frame diff — inter-frame motion (movers, action)", "frame_diff"), ("Combined — 0.5·gradient + 0.5·frame_diff (general purpose)", "combined"), ] with gr.Blocks(**_BLOCK_KW) as demo: gr.HTML(HERO_HTML) with gr.Row(equal_height=False, elem_classes="ovc-main"): # ─── Controls (narrow column) ──────────────────────────────────── with gr.Column(scale=4, min_width=360): with gr.Group(elem_classes="ovc-card"): gr.Markdown("### Input") video_in = gr.Video(label="Video", sources=["upload"], height=260) with gr.Row(elem_classes="ovc-preset"): btn_demo = gr.Button( "Load demo video", size="sm", visible=os.path.exists(DEMO_VIDEO_PATH), ) with gr.Group(elem_classes="ovc-card"): gr.Markdown("### Pipeline") viz_mode = gr.Radio( VIZ_CHOICES, value="selection", label="Visualization mode", ) sample_frames = gr.Slider( 4, 64, value=32, step=1, label="Sampled frames", ) top_k = gr.Slider( 16, 16384, value=1024, step=16, label="Total patches budget (whole video)", info="The single budget shared across the whole video. " "The uniform full-frame baseline will fit as many " "complete frames as this budget allows, where one full " "frame costs patch_size^2 patches; the codec path spends " "the same budget on saliency-selected patches.", ) patch_size = gr.Radio( PATCH_CHOICES, value=14, label="Patch size (px)", ) gop = gr.Radio( [ ("GOP = 4 — fixed 4-frame groups", "4"), ("GOP = 8 — fixed 8-frame groups", "8"), ("GOP = 16 — fixed 16-frame groups", "16"), ("Codec-stream: adaptive groups by saliency energy", "dynamic"), ], value="8", label="GOP (group of pictures)", info="Splits sampled frames into GOP groups. Each group " "produces exactly one GOP canvas: the group's first " "frame stays whole as the I-frame, and each later " "frame gets its own P section below it. So GOP=4 " "means each group is I P P P. For fixed GOP=N, the " "number of packed canvases is ceil(sampled_frames / N). " "Example: 32 sampled frames with GOP=4 gives 8 " "canvases; 32 sampled frames with GOP=8 gives 4. " "Codec-stream mode adaptively groups by saliency " "energy, targeting roughly 8-64 sampled frames per group.", ) with gr.Accordion("Time window", open=False): with gr.Row(): start_sec = gr.Number(value=0.0, precision=2, label="Start (s)") end_sec = gr.Number(value=0.0, precision=2, label="End (s)") gr.Markdown( "Set both to 0 to use the full video.", ) with gr.Accordion("Saliency", open=False): saliency_signal = gr.Radio( SIGNAL_CHOICES, value="gradient", label="Scoring signal", ) score_log_scale = gr.Checkbox( value=False, label="Apply log1p to scores", info="Compresses dynamic range — brings up mid-energy patches.", ) bitcost_pct = gr.Slider( 80.0, 99.9, value=99.0, step=0.1, label="Heatmap normalization percentile", info="Higher = harder to saturate red; lower = more vivid.", ) with gr.Accordion("Visual style", open=False): heatmap_alpha = gr.Slider( 0.1, 0.9, value=0.55, step=0.05, label="Heatmap blend α", ) fade_strength = gr.Slider( 0.0, 0.9, value=0.55, step=0.05, label="Selection fade strength", ) max_pixels = gr.Slider( 40000, 400000, value=150000, step=10000, label="Max pixels per frame", ) with gr.Row(elem_id="ovc-run"): run_btn = gr.Button("Run pipeline", variant="primary") # ─── Outputs (wide column) ─────────────────────────────────────── with gr.Column(scale=7, min_width=560): with gr.Group(elem_classes="ovc-card ovc-card-primary"): gr.Markdown("### Patch selection visualization") vis_out = gr.Video( label="", show_label=False, autoplay=True, height=460, ) with gr.Group(elem_classes="ovc-card ovc-card-primary"): gr.Markdown("### Cumulative patches over time") gr.Markdown( "Indigo: codec method — selects patches " "within frames according to saliency, so the curve rises " "in bursts. Cyan (dashed): uniform full-frame " "sampling — evenly samples complete frames from the same " "time window, targeting the same sampled-frame count as " "codec when the budget allows. Each step costs " "patch_size^2 patches, regardless of the preview " "frame resolution. The dotted line marks the requested " "budget." ) chart_out = gr.Plot(label="", show_label=False) with gr.Row(equal_height=False, elem_classes="ovc-bottom"): with gr.Column(scale=7, min_width=420): with gr.Group(elem_classes="ovc-card"): gr.Markdown("### Packed canvases (one per GOP group)") gr.Markdown( "Each canvas is one GOP group rendered in " "I/P structure: the group's first frame is " "the I-frame kept whole on top, and each " "later frame gets its own packed P-frame " "section below in time order. Fixed GOP=N means " "one canvas per N sampled frames." ) canvas_out = gr.Gallery( label="", show_label=False, columns=2, rows=2, height=520, object_fit="contain", preview=True, ) with gr.Column(scale=5, min_width=340): with gr.Group(elem_classes="ovc-card"): gr.Markdown("### Raw JSON") gr.Markdown( "Full reproducible record of this run " "(params, frame ids, group spans). Collapsed by " "default — click to expand." ) with gr.Accordion("Show full JSON", open=False): info_out = gr.Code( label="", language="json", lines=22, ) gr.HTML( '' ) run_btn.click( process, inputs=[ video_in, sample_frames, patch_size, top_k, max_pixels, viz_mode, heatmap_alpha, start_sec, end_sec, saliency_signal, score_log_scale, bitcost_pct, fade_strength, gop, ], outputs=[vis_out, canvas_out, info_out, chart_out], ) btn_demo.click( lambda: DEMO_PRESET, inputs=None, outputs=[ video_in, sample_frames, patch_size, top_k, max_pixels, viz_mode, heatmap_alpha, start_sec, end_sec, saliency_signal, score_log_scale, bitcost_pct, fade_strength, gop, ], ) if __name__ == "__main__": demo.launch( server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)), **_LAUNCH_KW, )