diff --git a/common/utils.py b/common/utils.py index 71b29a0c4eb4de..8fc87f7b59a7ac 100644 --- a/common/utils.py +++ b/common/utils.py @@ -11,6 +11,25 @@ LOG_COMPRESSION_LEVEL = 10 # little benefit up to level 15. level ~17 is a small step change +class Timer: + """Simple lap timer for profiling sequential operations.""" + + def __init__(self): + self._start = self._lap = time.monotonic() + self._sections = {} + + def lap(self, name): + now = time.monotonic() + self._sections[name] = now - self._lap + self._lap = now + + @property + def total(self): + return time.monotonic() - self._start + + def fmt(self, duration): + parts = ", ".join(f"{k}={v:.2f}s" + (f" ({duration/v:.0f}x)" if k == 'render' else "") for k, v in self._sections.items()) + return f"{duration}s in {self.total:.1f}s ({duration/self.total:.1f}x realtime) | {parts}" class CallbackReader: """Wraps a file, but overrides the read method to also diff --git a/system/ui/lib/application.py b/system/ui/lib/application.py index 04cd37af3ecbb8..a585b153d5a8a4 100644 --- a/system/ui/lib/application.py +++ b/system/ui/lib/application.py @@ -1,6 +1,7 @@ import atexit import cffi import os +import queue import time import signal import sys @@ -40,6 +41,9 @@ PROFILE_STATS = int(os.getenv("PROFILE_STATS", "100")) # Number of functions to show in profile output RECORD = os.getenv("RECORD") == "1" RECORD_OUTPUT = str(Path(os.getenv("RECORD_OUTPUT", "output")).with_suffix(".mp4")) +RECORD_BITRATE = os.getenv("RECORD_BITRATE", "") # Target bitrate e.g. "2000k" +RECORD_SPEED = int(os.getenv("RECORD_SPEED", "1")) # Speed multiplier +OFFSCREEN = os.getenv("OFFSCREEN") == "1" # Disable FPS limiting for fast offline rendering GL_VERSION = """ #version 300 es @@ -212,6 +216,9 @@ def __init__(self, width: int | None = None, height: int | None = None): self._render_texture: rl.RenderTexture | None = None self._burn_in_shader: rl.Shader | None = None self._ffmpeg_proc: subprocess.Popen | None = None + self._ffmpeg_queue: queue.Queue | None = None + self._ffmpeg_thread: threading.Thread | None = None + self._ffmpeg_stop_event: threading.Event | None = None self._textures: dict[str, rl.Texture] = {} self._target_fps: int = _DEFAULT_FPS self._last_fps_log_time: float = time.monotonic() @@ -276,25 +283,36 @@ def _close(sig, frame): rl.set_texture_filter(self._render_texture.texture, rl.TextureFilter.TEXTURE_FILTER_BILINEAR) if RECORD: + output_fps = fps * RECORD_SPEED ffmpeg_args = [ 'ffmpeg', '-v', 'warning', # Reduce ffmpeg log spam - '-stats', # Show encoding progress + '-nostats', # Suppress encoding progress '-f', 'rawvideo', # Input format '-pix_fmt', 'rgba', # Input pixel format '-s', f'{self._width}x{self._height}', # Input resolution '-r', str(fps), # Input frame rate '-i', 'pipe:0', # Input from stdin - '-vf', 'vflip,format=yuv420p', # Flip vertically and convert rgba to yuv420p - '-c:v', 'libx264', # Video codec - '-preset', 'ultrafast', # Encoding speed + '-vf', 'vflip,format=yuv420p', # Flip vertically and convert to yuv420p + '-r', str(output_fps), # Output frame rate (for speed multiplier) + '-c:v', 'libx264', + '-preset', 'ultrafast', + ] + if RECORD_BITRATE: + ffmpeg_args += ['-b:v', RECORD_BITRATE, '-maxrate', RECORD_BITRATE, '-bufsize', RECORD_BITRATE] + ffmpeg_args += [ '-y', # Overwrite existing file '-f', 'mp4', # Output format RECORD_OUTPUT, # Output file path ] self._ffmpeg_proc = subprocess.Popen(ffmpeg_args, stdin=subprocess.PIPE) + self._ffmpeg_queue = queue.Queue(maxsize=60) # Buffer up to 60 frames + self._ffmpeg_stop_event = threading.Event() + self._ffmpeg_thread = threading.Thread(target=self._ffmpeg_writer_thread, daemon=True) + self._ffmpeg_thread.start() - rl.set_target_fps(fps) + # OFFSCREEN disables FPS limiting for fast offline rendering (e.g. clips) + rl.set_target_fps(0 if OFFSCREEN else fps) self._target_fps = fps self._set_styles() @@ -336,6 +354,21 @@ def _startup_profile_context(self): print(f"{green}UI window ready in {elapsed_ms:.1f} ms{reset}") sys.exit(0) + def _ffmpeg_writer_thread(self): + """Background thread that writes frames to ffmpeg.""" + while True: + try: + data = self._ffmpeg_queue.get(timeout=1.0) + if data is None: # Sentinel to stop + break + self._ffmpeg_proc.stdin.write(data) + except queue.Empty: + if self._ffmpeg_stop_event.is_set(): + break + continue + except Exception: + break + def set_modal_overlay(self, overlay, callback: Callable | None = None): if self._modal_overlay.overlay is not None: if hasattr(self._modal_overlay.overlay, 'hide_event'): @@ -408,11 +441,17 @@ def _load_texture_from_image(self, image: rl.Image) -> rl.Texture: return texture def close_ffmpeg(self): + if self._ffmpeg_thread is not None: + # Send sentinel to signal end of frames, let thread drain queue + self._ffmpeg_queue.put(None) + self._ffmpeg_thread.join(timeout=30) + self._ffmpeg_stop_event.set() + if self._ffmpeg_proc is not None: self._ffmpeg_proc.stdin.flush() self._ffmpeg_proc.stdin.close() try: - self._ffmpeg_proc.wait(timeout=5) + self._ffmpeg_proc.wait(timeout=30) except subprocess.TimeoutExpired: self._ffmpeg_proc.terminate() self._ffmpeg_proc.wait() @@ -524,8 +563,7 @@ def render(self): image = rl.load_image_from_texture(self._render_texture.texture) data_size = image.width * image.height * 4 data = bytes(rl.ffi.buffer(image.data, data_size)) - self._ffmpeg_proc.stdin.write(data) - self._ffmpeg_proc.stdin.flush() + self._ffmpeg_queue.put(data) # Async write via background thread rl.unload_image(image) self._monitor_fps() diff --git a/tools/clip/run.py b/tools/clip/run.py index 9045a4381b1bc7..4335bca375f95f 100755 --- a/tools/clip/run.py +++ b/tools/clip/run.py @@ -1,310 +1,330 @@ #!/usr/bin/env python3 - -import logging import os -import platform -import shutil import sys import time -from argparse import ArgumentParser, ArgumentTypeError -from collections.abc import Sequence +import logging +import subprocess +import threading +import queue +import multiprocessing +import itertools +import numpy as np +import tqdm +from argparse import ArgumentParser from pathlib import Path -from random import randint -from subprocess import Popen -from typing import Literal +from concurrent.futures import ThreadPoolExecutor, as_completed -from cereal.messaging import SubMaster -from openpilot.common.basedir import BASEDIR -from openpilot.common.params import Params, UnknownKeyName -from openpilot.common.prefix import OpenpilotPrefix -from openpilot.common.utils import managed_proc from openpilot.tools.lib.route import Route from openpilot.tools.lib.logreader import LogReader +from openpilot.tools.lib.filereader import FileReader +from openpilot.tools.lib.framereader import FrameReader, ffprobe +from openpilot.selfdrive.test.process_replay.migration import migrate_all +from openpilot.common.prefix import OpenpilotPrefix +from openpilot.common.utils import Timer +from msgq.visionipc import VisionIpcServer, VisionStreamType -DEFAULT_OUTPUT = 'output.mp4' -DEMO_START = 90 -DEMO_END = 105 -DEMO_ROUTE = 'a2a0ccea32023010/2023-07-27--13-01-19' FRAMERATE = 20 -PIXEL_DEPTH = '24' -RESOLUTION = '2160x1080' -SECONDS_TO_WARM = 2 -PROC_WAIT_SECONDS = 30*10 - -OPENPILOT_FONT = str(Path(BASEDIR, 'selfdrive/assets/fonts/Inter-Regular.ttf').resolve()) -REPLAY = str(Path(BASEDIR, 'tools/replay/replay').resolve()) -UI = str(Path(BASEDIR, 'selfdrive/ui/ui').resolve()) - -logger = logging.getLogger('clip.py') - - -def check_for_failure(procs: list[Popen]): - for proc in procs: - exit_code = proc.poll() - if exit_code is not None and exit_code != 0: - cmd = str(proc.args) - if isinstance(proc.args, str): - cmd = proc.args - elif isinstance(proc.args, Sequence): - cmd = str(proc.args[0]) - msg = f'{cmd} failed, exit code {exit_code}' - logger.error(msg) - stdout, stderr = proc.communicate() - if stdout: - logger.error(stdout.decode()) - if stderr: - logger.error(stderr.decode()) - raise ChildProcessError(msg) - - -def escape_ffmpeg_text(value: str): - special_chars = {',': '\\,', ':': '\\:', '=': '\\=', '[': '\\[', ']': '\\]'} - value = value.replace('\\', '\\\\\\\\\\\\\\\\') - for char, escaped in special_chars.items(): - value = value.replace(char, escaped) - return value - - -def get_logreader(route: Route): - return LogReader(route.qlog_paths()[0] if len(route.qlog_paths()) else route.name.canonical_name) - - -def get_meta_text(lr: LogReader, route: Route): - init_data = lr.first('initData') - car_params = lr.first('carParams') - origin_parts = init_data.gitRemote.split('/') - origin = origin_parts[3] if len(origin_parts) > 3 else 'unknown' - return ', '.join([ - f"openpilot v{init_data.version}", - f"route: {route.name.canonical_name}", - f"car: {car_params.carFingerprint}", - f"origin: {origin}", - f"branch: {init_data.gitBranch}", - f"commit: {init_data.gitCommit[:7]}", - f"modified: {str(init_data.dirty).lower()}", - ]) - - -def parse_args(parser: ArgumentParser): +DEMO_ROUTE, DEMO_START, DEMO_END = 'a2a0ccea32023010/2023-07-27--13-01-19', 90, 105 + +logger = logging.getLogger('clip') + + +def parse_args(): + parser = ArgumentParser(description="Direct clip renderer") + parser.add_argument("route", nargs="?", help="Route ID (dongle/route or dongle/route/start/end)") + parser.add_argument("-s", "--start", type=int, help="Start time in seconds") + parser.add_argument("-e", "--end", type=int, help="End time in seconds") + parser.add_argument("-o", "--output", default="output.mp4", help="Output file path") + parser.add_argument("-d", "--data-dir", help="Local directory with route data") + parser.add_argument("-t", "--title", help="Title overlay text") + parser.add_argument("-f", "--file-size", type=float, default=9.0, help="Target file size in MB") + parser.add_argument("-x", "--speed", type=int, default=1, help="Speed multiplier") + parser.add_argument("--demo", action="store_true", help="Use demo route with default timing") + parser.add_argument("--big", action="store_true", default=True, help="Use big UI (2160x1080)") + parser.add_argument("--qcam", action="store_true", help="Use qcamera instead of fcamera") + parser.add_argument("--windowed", action="store_true", help="Show window") + parser.add_argument("--no-metadata", action="store_true", help="Disable metadata overlay") + parser.add_argument("--no-time-overlay", action="store_true", help="Disable time overlay") args = parser.parse_args() + if args.demo: - args.route = DEMO_ROUTE - if args.start is None or args.end is None: - args.start = DEMO_START - args.end = DEMO_END - elif args.route.count('/') == 1: - if args.start is None or args.end is None: - parser.error('must provide both start and end if timing is not in the route ID') - elif args.route.count('/') == 3: - if args.start is not None or args.end is not None: - parser.error('don\'t provide timing when including it in the route ID') + args.route, args.start, args.end = args.route or DEMO_ROUTE, args.start or DEMO_START, args.end or DEMO_END + elif not args.route: + parser.error("route is required (or use --demo)") + + if args.route and args.route.count('/') == 3: parts = args.route.split('/') - args.route = '/'.join(parts[:2]) - args.start = int(parts[2]) - args.end = int(parts[3]) - if args.end <= args.start: - parser.error(f'end ({args.end}) must be greater than start ({args.start})') - if args.start < SECONDS_TO_WARM: - parser.error(f'start must be greater than {SECONDS_TO_WARM}s to allow the UI time to warm up') - - try: - args.route = Route(args.route, data_dir=args.data_dir) - except Exception as e: - parser.error(f'failed to get route: {e}') - - # FIXME: length isn't exactly max segment seconds, simplify to replay exiting at end of data - length = round(args.route.max_seg_number * 60) - if args.start >= length: - parser.error(f'start ({args.start}s) cannot be after end of route ({length}s)') - if args.end > length: - parser.error(f'end ({args.end}s) cannot be after end of route ({length}s)') + args.route, args.start, args.end = '/'.join(parts[:2]), args.start or int(parts[2]), args.end or int(parts[3]) + if args.start is None or args.end is None: + parser.error("--start and --end are required") + if args.end <= args.start: + parser.error(f"end ({args.end}) must be greater than start ({args.start})") return args -def populate_car_params(lr: LogReader): - init_data = lr.first('initData') - assert init_data is not None +def setup_env(output_path: str, big: bool = False, speed: int = 1, target_mb: float = 0, duration: int = 0): + os.environ.update({"RECORD": "1", "OFFSCREEN": "1", "RECORD_OUTPUT": str(Path(output_path).with_suffix(".mp4"))}) + if speed > 1: + os.environ["RECORD_SPEED"] = str(speed) + if target_mb > 0 and duration > 0: + os.environ["RECORD_BITRATE"] = f"{int(target_mb * 8 * 1024 / (duration / speed))}k" + if big: + os.environ["BIG"] = "1" + + +def _download_segment(path: str) -> bytes: + from openpilot.tools.lib.filereader import FileReader + with FileReader(path) as f: + return bytes(f.read()) + + +def _parse_and_chunk_segment(args: tuple) -> list[dict]: + raw_data, fps = args[0], args[2] + from openpilot.tools.lib.logreader import _LogFileReader + messages = migrate_all(list(_LogFileReader("", dat=raw_data, sort_by_time=True))) + if not messages: + return [] + + dt_ns, chunks, current, next_time = 1e9 / fps, [], {}, messages[0].logMonoTime + 1e9 / fps # type: ignore[var-annotated] + for msg in messages: + if msg.logMonoTime >= next_time: + chunks.append(current) + current, next_time = {}, next_time + dt_ns * ((msg.logMonoTime - next_time) // dt_ns + 1) + current[msg.which()] = msg + return chunks + [current] if current else chunks + + +def load_logs_parallel(log_paths: list[str], fps: int = 20) -> list[dict]: + num_workers = min(16, len(log_paths), (multiprocessing.cpu_count() or 1)) + logger.info(f"Downloading {len(log_paths)} segments with {num_workers} workers...") + + with ThreadPoolExecutor(max_workers=num_workers) as pool: + futures = {pool.submit(_download_segment, path): idx for idx, path in enumerate(log_paths)} + raw_data = {futures[f]: f.result() for f in as_completed(futures)} + + logger.info("Parsing and chunking segments...") + with multiprocessing.Pool(num_workers) as pool: + return list(itertools.chain.from_iterable(pool.map(_parse_and_chunk_segment, [(raw_data[i], i, fps) for i in range(len(log_paths))]))) + + +def patch_submaster(message_chunks, ui_state): + # Reset started_frame so alerts render correctly (recv_frame must be >= started_frame) + ui_state.started_frame = 0 + ui_state.started_time = time.monotonic() + + def mock_update(timeout=None): + sm, t = ui_state.sm, time.monotonic() + sm.updated = dict.fromkeys(sm.services, False) + if sm.frame < len(message_chunks): + for svc, msg in message_chunks[sm.frame].items(): + if svc in sm.data: + sm.seen[svc] = sm.updated[svc] = sm.alive[svc] = sm.valid[svc] = True + sm.data[svc] = getattr(msg.as_builder(), svc) + sm.logMonoTime[svc], sm.recv_time[svc], sm.recv_frame[svc] = msg.logMonoTime, t, sm.frame + sm.frame += 1 + ui_state.sm.update = mock_update + + +def get_frame_dimensions(camera_path: str) -> tuple[int, int]: + """Get frame dimensions from a video file using ffprobe.""" + probe = ffprobe(camera_path) + stream = probe["streams"][0] + return stream["width"], stream["height"] + + +def iter_segment_frames(camera_paths, start_time, end_time, fps=20, use_qcam=False, frame_size: tuple[int, int] | None = None): + frames_per_seg = fps * 60 + start_frame, end_frame = int(start_time * fps), int(end_time * fps) + current_seg: int = -1 + seg_frames: FrameReader | np.ndarray | None = None + + for global_idx in range(start_frame, end_frame): + seg_idx, local_idx = global_idx // frames_per_seg, global_idx % frames_per_seg + + if seg_idx != current_seg: + current_seg = seg_idx + path = camera_paths[seg_idx] if seg_idx < len(camera_paths) else None + if not path: + raise RuntimeError(f"No camera file for segment {seg_idx}") + + if use_qcam: + w, h = frame_size or get_frame_dimensions(path) + with FileReader(path) as f: + result = subprocess.run(["ffmpeg", "-v", "quiet", "-i", "-", "-f", "rawvideo", "-pix_fmt", "nv12", "-"], + input=f.read(), capture_output=True) + if result.returncode != 0: + raise RuntimeError(f"ffmpeg failed: {result.stderr.decode()}") + seg_frames = np.frombuffer(result.stdout, dtype=np.uint8).reshape(-1, w * h * 3 // 2) + else: + seg_frames = FrameReader(path, pix_fmt="nv12") + + assert seg_frames is not None + frame = seg_frames[local_idx] if use_qcam else seg_frames.get(local_idx) # type: ignore[index, union-attr] + yield global_idx, frame + + +class FrameQueue: + def __init__(self, camera_paths, start_time, end_time, fps=20, prefetch_count=60, use_qcam=False): + # Probe first valid camera file for dimensions + first_path = next((p for p in camera_paths if p), None) + if not first_path: + raise RuntimeError("No valid camera paths") + self.frame_w, self.frame_h = get_frame_dimensions(first_path) + + self._queue, self._stop, self._error = queue.Queue(maxsize=prefetch_count), threading.Event(), None + self._thread = threading.Thread(target=self._worker, + args=(camera_paths, start_time, end_time, fps, use_qcam, (self.frame_w, self.frame_h)), daemon=True) + self._thread.start() + + def _worker(self, camera_paths, start_time, end_time, fps, use_qcam, frame_size): + try: + for idx, data in iter_segment_frames(camera_paths, start_time, end_time, fps, use_qcam, frame_size): + if self._stop.is_set(): + break + self._queue.put((idx, data.tobytes())) + except Exception as e: + logger.exception("Decode error") + self._error = e + finally: + self._queue.put(None) + + def get(self, timeout=60.0): + if self._error: + raise self._error + result = self._queue.get(timeout=timeout) + if result is None: + raise StopIteration("No more frames") + return result + + def stop(self): + self._stop.set() + while not self._queue.empty(): + try: + self._queue.get_nowait() + except queue.Empty: + break + self._thread.join(timeout=2.0) + + +def load_route_metadata(route): + from openpilot.common.params import Params, UnknownKeyName + lr = LogReader(route.log_paths()[0]) + init_data, car_params = lr.first('initData'), lr.first('carParams') params = Params() - entries = init_data.params.entries - for cp in entries: - key, value = cp.key, cp.value + for entry in init_data.params.entries: try: - params.put(key, params.cpp2python(key, value)) + params.put(entry.key, params.cpp2python(entry.key, entry.value)) except UnknownKeyName: - # forks of openpilot may have other Params keys configured. ignore these - logger.warning(f"unknown Params key '{key}', skipping") - logger.debug('persisted CarParams') - - -def validate_env(parser: ArgumentParser): - if platform.system() not in ['Linux']: - parser.exit(1, f'clip.py: error: {platform.system()} is not a supported operating system\n') - for proc in ['Xvfb', 'ffmpeg']: - if shutil.which(proc) is None: - parser.exit(1, f'clip.py: error: missing {proc} command, is it installed?\n') - for proc in [REPLAY, UI]: - if shutil.which(proc) is None: - parser.exit(1, f'clip.py: error: missing {proc} command, did you build openpilot yet?\n') - - -def validate_output_file(output_file: str): - if not output_file.endswith('.mp4'): - raise ArgumentTypeError('output must be an mp4') - return output_file - - -def validate_route(route: str): - if route.count('/') not in (1, 3): - raise ArgumentTypeError(f'route must include or exclude timing, example: {DEMO_ROUTE}') - return route - - -def validate_title(title: str): - if len(title) > 80: - raise ArgumentTypeError('title must be no longer than 80 chars') - return title - - -def wait_for_frames(procs: list[Popen]): - sm = SubMaster(['uiDebug']) - no_frames_drawn = True - while no_frames_drawn: - sm.update() - no_frames_drawn = sm['uiDebug'].drawTimeMillis == 0. - check_for_failure(procs) - - -def clip( - data_dir: str | None, - quality: Literal['low', 'high'], - prefix: str, - route: Route, - out: str, - start: int, - end: int, - speed: int, - target_mb: int, - title: str | None, -): - logger.info(f'clipping route {route.name.canonical_name}, start={start} end={end} quality={quality} target_filesize={target_mb}MB') - lr = get_logreader(route) - - begin_at = max(start - SECONDS_TO_WARM, 0) - duration = end - start - bit_rate_kbps = int(round(target_mb * 8 * 1024 * 1024 / duration / 1000)) - - # TODO: evaluate creating fn that inspects /tmp/.X11-unix and creates unused display to avoid possibility of collision - display = f':{randint(99, 999)}' - - box_style = 'box=1:boxcolor=black@0.33:boxborderw=7' - meta_text = get_meta_text(lr, route) - overlays = [ - # metadata overlay - f"drawtext=text='{escape_ffmpeg_text(meta_text)}':fontfile={OPENPILOT_FONT}:fontcolor=white:fontsize=15:{box_style}:x=(w-text_w)/2:y=5.5:enable='between(t,1,5)'", - # route time overlay - f"drawtext=text='%{{eif\\:floor(({start}+t)/60)\\:d\\:2}}\\:%{{eif\\:mod({start}+t\\,60)\\:d\\:2}}':fontfile={OPENPILOT_FONT}:fontcolor=white:fontsize=24:{box_style}:x=w-text_w-38:y=38" - ] - if title: - overlays.append(f"drawtext=text='{escape_ffmpeg_text(title)}':fontfile={OPENPILOT_FONT}:fontcolor=white:fontsize=32:{box_style}:x=(w-text_w)/2:y=53") + pass + + origin = init_data.gitRemote.split('/')[3] if len(init_data.gitRemote.split('/')) > 3 else 'unknown' + return { + 'version': init_data.version, 'route': route.name.canonical_name, + 'car': car_params.carFingerprint if car_params else 'unknown', 'origin': origin, + 'branch': init_data.gitBranch, 'commit': init_data.gitCommit[:7], 'modified': str(init_data.dirty).lower(), + } + + +def draw_text_box(rl, text, x, y, size, gui_app, color=None, center=False): + box_color, white = rl.Color(0, 0, 0, 85), color or rl.WHITE + text_width = rl.measure_text(text, size) + if center: + x = (gui_app.width - text_width) // 2 + rl.draw_rectangle(x - 8, y - 4, text_width + 16, size + 8, box_color) + rl.draw_text(text, x, y, size, white) - if speed > 1: - overlays += [ - f"setpts=PTS/{speed}", - "fps=60", - ] - - ffmpeg_cmd = [ - 'ffmpeg', '-y', - '-video_size', RESOLUTION, - '-framerate', str(FRAMERATE), - '-f', 'x11grab', - '-rtbufsize', '100M', - '-draw_mouse', '0', - '-i', display, - '-c:v', 'libx264', - '-maxrate', f'{bit_rate_kbps}k', - '-bufsize', f'{bit_rate_kbps*2}k', - '-crf', '23', - '-filter:v', ','.join(overlays), - '-preset', 'ultrafast', - '-tune', 'zerolatency', - '-pix_fmt', 'yuv420p', - '-movflags', '+faststart', - '-f', 'mp4', - '-t', str(duration), - out, - ] - - replay_cmd = [REPLAY, '--ecam', '-c', '1', '-s', str(begin_at), '--prefix', prefix] - if data_dir: - replay_cmd.extend(['--data_dir', data_dir]) - if quality == 'low': - replay_cmd.append('--qcam') - replay_cmd.append(route.name.canonical_name) - - ui_cmd = [UI, '-platform', 'xcb'] - xvfb_cmd = ['Xvfb', display, '-terminate', '-screen', '0', f'{RESOLUTION}x{PIXEL_DEPTH}'] - - with OpenpilotPrefix(prefix, shared_download_cache=True): - populate_car_params(lr) - env = os.environ.copy() - env['DISPLAY'] = display - - with managed_proc(xvfb_cmd, env) as xvfb_proc, managed_proc(ui_cmd, env) as ui_proc, managed_proc(replay_cmd, env) as replay_proc: - procs = [xvfb_proc, ui_proc, replay_proc] - logger.info('waiting for replay to begin (loading segments, may take a while)...') - wait_for_frames(procs) - logger.debug(f'letting UI warm up ({SECONDS_TO_WARM}s)...') - time.sleep(SECONDS_TO_WARM) - check_for_failure(procs) - with managed_proc(ffmpeg_cmd, env) as ffmpeg_proc: - procs.append(ffmpeg_proc) - logger.info(f'recording in progress ({duration}s)...') - ffmpeg_proc.wait(duration + PROC_WAIT_SECONDS) - check_for_failure(procs) - logger.info(f'recording complete: {Path(out).resolve()}') + +def render_overlays(rl, gui_app, metadata, title, start_time, frame_idx, show_metadata, show_time): + if show_metadata and metadata and frame_idx < FRAMERATE * 5: + m = metadata + text = ", ".join([f"openpilot v{m['version']}", f"route: {m['route']}", f"car: {m['car']}", f"origin: {m['origin']}", + f"branch: {m['branch']}", f"commit: {m['commit']}", f"modified: {m['modified']}"]) + draw_text_box(rl, text, 0, 8, 26, gui_app, center=True) + + if title: + draw_text_box(rl, title, 0, 60, 48, gui_app, center=True) + + if show_time: + t = start_time + frame_idx / FRAMERATE + draw_text_box(rl, f"{int(t)//60:02d}:{int(t)%60:02d}", gui_app.width - rl.measure_text(f"{int(t)//60:02d}:{int(t)%60:02d}", 36) - 45, 45, 36, gui_app) + + +def clip(route: Route, output: str, start: int, end: int, headless: bool = True, big: bool = False, + title: str | None = None, show_metadata: bool = True, show_time: bool = True, use_qcam: bool = False): + timer, duration = Timer(), end - start + + import pyray as rl + if big: + from openpilot.selfdrive.ui.layouts.main import MainLayout + else: + from openpilot.selfdrive.ui.mici.layouts.main import MiciMainLayout as MainLayout # type: ignore[assignment] + from openpilot.selfdrive.ui.ui_state import ui_state + from openpilot.system.ui.lib.application import gui_app + timer.lap("import") + + logger.info(f"Clipping {route.name.canonical_name}, {start}s-{end}s ({duration}s)") + seg_start, seg_end = start // 60, (end - 1) // 60 + 1 + all_chunks = load_logs_parallel(route.log_paths()[seg_start:seg_end], fps=FRAMERATE) + timer.lap("logs") + + frame_start = (start - seg_start * 60) * FRAMERATE + message_chunks = all_chunks[frame_start:frame_start + duration * FRAMERATE] + if not message_chunks: + logger.error("No messages to render") + sys.exit(1) + + metadata = load_route_metadata(route) if show_metadata else None + if headless: + rl.set_config_flags(rl.ConfigFlags.FLAG_WINDOW_HIDDEN) + + with OpenpilotPrefix(shared_download_cache=True): + camera_paths = route.qcamera_paths() if use_qcam else route.camera_paths() + frame_queue = FrameQueue(camera_paths, start, end, fps=FRAMERATE, use_qcam=use_qcam) + + vipc = VisionIpcServer("camerad") + vipc.create_buffers(VisionStreamType.VISION_STREAM_ROAD, 4, frame_queue.frame_w, frame_queue.frame_h) + vipc.start_listener() + + patch_submaster(message_chunks, ui_state) + gui_app.init_window("clip", fps=FRAMERATE) + + main_layout = MainLayout() + main_layout.set_rect(rl.Rectangle(0, 0, gui_app.width, gui_app.height)) + timer.lap("setup") + + frame_idx = 0 + with tqdm.tqdm(total=len(message_chunks), desc="Rendering", unit="frame") as pbar: + for should_render in gui_app.render(): + if frame_idx >= len(message_chunks): + break + _, frame_bytes = frame_queue.get() + vipc.send(VisionStreamType.VISION_STREAM_ROAD, frame_bytes, frame_idx, int(frame_idx * 5e7), int(frame_idx * 5e7)) + ui_state.update() + if should_render: + main_layout.render() + render_overlays(rl, gui_app, metadata, title, start, frame_idx, show_metadata, show_time) + frame_idx += 1 + pbar.update(1) + timer.lap("render") + + frame_queue.stop() + gui_app.close() + timer.lap("ffmpeg") + + logger.info(f"Clip saved to: {Path(output).resolve()}") + logger.info(f"Generated {timer.fmt(duration)}") def main(): - p = ArgumentParser(prog='clip.py', description='clip your openpilot route.', epilog='comma.ai') - validate_env(p) - route_group = p.add_mutually_exclusive_group(required=True) - route_group.add_argument('route', nargs='?', type=validate_route, help=f'The route (e.g. {DEMO_ROUTE} or {DEMO_ROUTE}/{DEMO_START}/{DEMO_END})') - route_group.add_argument('--demo', help='use the demo route', action='store_true') - p.add_argument('-d', '--data-dir', help='local directory where route data is stored') - p.add_argument('-e', '--end', help='stop clipping at seconds', type=int) - p.add_argument('-f', '--file-size', help='target file size (Discord/GitHub support max 10MB, default is 9MB)', type=float, default=9.) - p.add_argument('-o', '--output', help='output clip to (.mp4)', type=validate_output_file, default=DEFAULT_OUTPUT) - p.add_argument('-p', '--prefix', help='openpilot prefix', default=f'clip_{randint(100, 99999)}') - p.add_argument('-q', '--quality', help='quality of camera (low = qcam, high = hevc)', choices=['low', 'high'], default='high') - p.add_argument('-x', '--speed', help='record the clip at this speed multiple', type=int, default=1) - p.add_argument('-s', '--start', help='start clipping at seconds', type=int) - p.add_argument('-t', '--title', help='overlay this title on the video (e.g. "Chill driving across the Golden Gate Bridge")', type=validate_title) - args = parse_args(p) - exit_code = 1 - try: - clip( - data_dir=args.data_dir, - quality=args.quality, - prefix=args.prefix, - route=args.route, - out=args.output, - start=args.start, - end=args.end, - speed=args.speed, - target_mb=args.file_size, - title=args.title, - ) - exit_code = 0 - except KeyboardInterrupt as e: - logger.exception('interrupted by user', exc_info=e) - except Exception as e: - logger.exception('encountered error', exc_info=e) - sys.exit(exit_code) - - -if __name__ == '__main__': - logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)s %(levelname)s\t%(message)s') + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s\t%(message)s") + args = parse_args() + assert args.big, "Clips doesn't support mici UI yet. TODO: make it work" + + setup_env(args.output, big=args.big, speed=args.speed, target_mb=args.file_size, duration=args.end - args.start) + clip(Route(args.route, data_dir=args.data_dir), args.output, args.start, args.end, not args.windowed, + args.big, args.title, not args.no_metadata, not args.no_time_overlay, args.qcam) + +if __name__ == "__main__": main()