diff --git a/tools/replay/camera.py b/tools/replay/camera.py new file mode 100755 index 00000000000000..71d900de339be2 --- /dev/null +++ b/tools/replay/camera.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python3 +import logging +import queue +import threading +import time +from enum import Enum +from typing import Optional + +import numpy as np + +from msgq.visionipc import VisionIpcServer, VisionStreamType +from openpilot.tools.lib.framereader import FrameReader + +log = logging.getLogger("replay") + +BUFFER_COUNT = 40 + + +def get_nv12_info(width: int, height: int) -> tuple[int, int, int]: + """Calculate NV12 buffer parameters matching C++ VENUS macros.""" + # VENUS_Y_STRIDE for NV12: align to 128 + nv12_width = (width + 127) & ~127 + # VENUS_Y_SCANLINES for NV12: align to 32 + nv12_height = (height + 31) & ~31 + # Buffer size from v4l2_format (matches C++ implementation) + nv12_buffer_size = 2346 * nv12_width + return nv12_width, nv12_height, nv12_buffer_size + + +def repack_nv12_to_venus(yuv: np.ndarray, width: int, height: int, stride: int) -> np.ndarray: + """Repack NV12 data from unpadded to VENUS-aligned stride. + + FrameReader returns NV12 with original width as stride. + VisionIPC expects VENUS-aligned stride (128-byte aligned). + """ + y_plane_size = width * height + + # Reshape into planes + y_plane = yuv[:y_plane_size].reshape(height, width) + uv_plane = yuv[y_plane_size:].reshape(height // 2, width) + + # Calculate VENUS-aligned scanlines + y_scanlines = (height + 31) & ~31 + uv_scanlines = (height // 2 + 31) & ~31 + + # Create padded planes with VENUS stride using numpy pad + # Pad width: (0 padding before, stride-width padding after) for axis 1 + y_padded = np.pad(y_plane, ((0, y_scanlines - height), (0, stride - width)), mode='constant') + uv_padded = np.pad(uv_plane, ((0, uv_scanlines - height // 2), (0, stride - width)), mode='constant') + + # Concatenate and flatten + return np.concatenate([y_padded.ravel(), uv_padded.ravel()]) + + +class CameraType(Enum): + ROAD = 0 + DRIVER = 1 + WIDE_ROAD = 2 + + +CAMERA_STREAM_TYPES = { + CameraType.ROAD: VisionStreamType.VISION_STREAM_ROAD, + CameraType.DRIVER: VisionStreamType.VISION_STREAM_DRIVER, + CameraType.WIDE_ROAD: VisionStreamType.VISION_STREAM_WIDE_ROAD, +} + + +class Camera: + def __init__(self, cam_type: CameraType): + self.type = cam_type + self.stream_type = CAMERA_STREAM_TYPES[cam_type] + self.width = 0 + self.height = 0 + self.nv12_stride = 0 # VENUS-aligned stride + self.nv12_buffer_size = 0 # Padded buffer size for VisionIPC + self.thread: Optional[threading.Thread] = None + self.queue: queue.Queue = queue.Queue() + self.prefetch_up_to: int = -1 # Highest frame index we've prefetched + + +class CameraServer: + def __init__(self, camera_sizes: Optional[dict[CameraType, tuple[int, int]]] = None): + self._cameras = { + CameraType.ROAD: Camera(CameraType.ROAD), + CameraType.DRIVER: Camera(CameraType.DRIVER), + CameraType.WIDE_ROAD: Camera(CameraType.WIDE_ROAD), + } + + if camera_sizes: + for cam_type, (w, h) in camera_sizes.items(): + self._cameras[cam_type].width = w + self._cameras[cam_type].height = h + + self._vipc_server: Optional[VisionIpcServer] = None + self._publishing = 0 + self._publishing_lock = threading.Lock() + self._exit = False + + self._start_vipc_server() + + def __del__(self): + self._exit = True + for cam in self._cameras.values(): + if cam.thread is not None and cam.thread.is_alive(): + # Signal termination + cam.queue.put(None) + cam.thread.join() + + def _start_vipc_server(self) -> None: + self._vipc_server = VisionIpcServer("camerad") + + for cam in self._cameras.values(): + if cam.width > 0 and cam.height > 0: + nv12_width, nv12_height, nv12_buffer_size = get_nv12_info(cam.width, cam.height) + cam.nv12_stride = nv12_width + cam.nv12_buffer_size = nv12_buffer_size + log.info(f"camera[{cam.type.name}] frame size {cam.width}x{cam.height}, stride {nv12_width}, buffer {nv12_buffer_size}") + self._vipc_server.create_buffers_with_sizes( + cam.stream_type, BUFFER_COUNT, cam.width, cam.height, + nv12_buffer_size, nv12_width, nv12_width * nv12_height + ) + + if cam.thread is None or not cam.thread.is_alive(): + cam.thread = threading.Thread( + target=self._camera_thread, + args=(cam,), + daemon=True + ) + cam.thread.start() + + self._vipc_server.start_listener() + + def _camera_thread(self, cam: Camera) -> None: + current_fr: Optional[FrameReader] = None + prefetch_ahead = 60 # Stay 2 GOPs ahead + + while not self._exit: + # Try to get next frame request, but don't block long - we want to prefetch + try: + item = cam.queue.get(timeout=0.005) # 5ms timeout for responsive prefetching + except queue.Empty: + # No frame requested - use idle time to prefetch + if current_fr is not None and cam.prefetch_up_to < current_fr.frame_count - 1: + # Prefetch next frame sequentially + cam.prefetch_up_to += 1 + self._get_frame(current_fr, cam.prefetch_up_to) + continue + + if item is None: # Termination signal + break + + fr, event = item + current_fr = fr + + try: + # Get encode index from the event + eidx = event.roadEncodeIdx if cam.type == CameraType.ROAD else \ + event.driverEncodeIdx if cam.type == CameraType.DRIVER else \ + event.wideRoadEncodeIdx + + local_frame_idx = eidx.segmentId # segmentId is actually the local frame index within segment + frame_id = eidx.frameId + + # Update prefetch target if we've caught up + if cam.prefetch_up_to < local_frame_idx: + cam.prefetch_up_to = local_frame_idx + + # Get the frame (should be cached from prefetch) + yuv = self._get_frame(fr, local_frame_idx) + if yuv is not None: + # Repack from unpadded NV12 to VENUS-aligned stride + yuv_venus = repack_nv12_to_venus(yuv, cam.width, cam.height, cam.nv12_stride) + yuv_bytes = yuv_venus.tobytes() + # Pad to match the full buffer size expected by VisionIPC + if len(yuv_bytes) < cam.nv12_buffer_size: + yuv_bytes = yuv_bytes + bytes(cam.nv12_buffer_size - len(yuv_bytes)) + + timestamp_sof = eidx.timestampSof + timestamp_eof = eidx.timestampEof + self._vipc_server.send(cam.stream_type, yuv_bytes, frame_id, timestamp_sof, timestamp_eof) + + # Aggressively prefetch if we're not far enough ahead + # This ensures we decode the next GOP before we need it + target = min(local_frame_idx + prefetch_ahead, fr.frame_count - 1) + while cam.prefetch_up_to < target and cam.queue.empty(): + cam.prefetch_up_to += 1 + self._get_frame(fr, cam.prefetch_up_to) + + except Exception: + log.exception(f"camera[{cam.type.name}] error") + + with self._publishing_lock: + self._publishing -= 1 + + def _get_frame(self, fr: FrameReader, local_idx: int) -> Optional[np.ndarray]: + """Get frame from FrameReader. FrameReader has its own LRU cache.""" + try: + if local_idx < fr.frame_count: + return fr.get(local_idx) + except Exception as e: + log.warning(f"Failed to decode frame {local_idx}: {e}") + return None + + def push_frame(self, cam_type: CameraType, fr: FrameReader, event) -> None: + cam = self._cameras[cam_type] + + # Check if frame size changed + if cam.width != fr.w or cam.height != fr.h: + cam.width = fr.w + cam.height = fr.h + self.wait_for_sent() + self._start_vipc_server() + + with self._publishing_lock: + self._publishing += 1 + cam.queue.put((fr, event)) + + def wait_for_sent(self) -> None: + while True: + with self._publishing_lock: + if self._publishing <= 0: + break + time.sleep(0.001) + + def warm_cache(self, fr: FrameReader, start_frame: int = 0, num_gops: int = 3) -> None: + """Pre-decode frames to warm the cache before playback starts. + + This prevents stutter at the start of playback by ensuring the first + few GOPs are already decoded and cached. + """ + gop_size = 30 + end_frame = min(start_frame + num_gops * gop_size, fr.frame_count) + log.info(f"warming cache: frames {start_frame}-{end_frame}") + for i in range(start_frame, end_frame): + self._get_frame(fr, i) diff --git a/tools/replay/consoleui.py b/tools/replay/consoleui.py new file mode 100755 index 00000000000000..4445236726e32d --- /dev/null +++ b/tools/replay/consoleui.py @@ -0,0 +1,504 @@ +#!/usr/bin/env python3 +import curses +import logging +import signal +import threading +import time +from enum import Enum, auto +from typing import Optional + +import cereal.messaging as messaging + +from openpilot.tools.replay.replay import Replay +from openpilot.tools.replay.timeline import FindFlag, TimelineType + + +class QueueHandler(logging.Handler): + """Logging handler that queues messages for display in curses UI.""" + def __init__(self, queue: list, lock: threading.Lock): + super().__init__() + self._queue = queue + self._lock = lock + + def emit(self, record: logging.LogRecord) -> None: + # Map logging levels to our color scheme + level = 0 # DEBUG + if record.levelno >= logging.ERROR: + level = 2 # Critical/Red + elif record.levelno >= logging.WARNING: + level = 1 # Warning/Yellow + with self._lock: + self._queue.append((level, self.format(record))) + +BORDER_SIZE = 3 + +KEYBOARD_SHORTCUTS = [ + [ + ("s", "+10s"), + ("S", "-10s"), + ("m", "+60s"), + ("M", "-60s"), + ("space", "Pause/Resume"), + ("e", "Next Engagement"), + ("d", "Next Disengagement"), + ("t", "Next User Tag"), + ], + [ + ("i", "Next Info"), + ("w", "Next Warning"), + ("c", "Next Critical"), + ("enter", "Enter seek request"), + ("+/-", "Playback speed"), + ("q", "Exit"), + ], +] + + +class Color(Enum): + DEFAULT = 0 + DEBUG = 1 + YELLOW = 2 + GREEN = 3 + RED = 4 + CYAN = 5 + BRIGHT_WHITE = 6 + ENGAGED = 7 + DISENGAGED = 8 + + +class Status(Enum): + PLAYING = auto() + PAUSED = auto() + + +class Win(Enum): + TITLE = 0 + STATS = 1 + LOG = 2 + LOG_BORDER = 3 + DOWNLOAD_BAR = 4 + TIMELINE = 5 + TIMELINE_DESC = 6 + HELP = 7 + CAR_STATE = 8 + + +SPEED_ARRAY = [0.2, 0.5, 1.0, 2.0, 4.0, 8.0] + + +class ConsoleUI: + def __init__(self, replay: Replay): + self.replay = replay + self.status = Status.PLAYING + self.windows: dict[Win, Optional[curses.window]] = dict.fromkeys(Win, None) + self.max_width = 0 + self.max_height = 0 + + self._lock = threading.Lock() + self._logs: list[tuple[int, str]] = [] + self._progress_cur = 0 + self._progress_total = 0 + self._download_success = False + self._exit = False + + self._sm = messaging.SubMaster(["carState", "liveParameters"]) + + # Set up signal handler for clean exit + signal.signal(signal.SIGINT, lambda s, f: setattr(self, '_exit', True)) + + # Set up logging handler to capture logs for display + self._log_handler = QueueHandler(self._logs, self._lock) + self._log_handler.setFormatter(logging.Formatter('%(message)s')) + logging.getLogger("replay").addHandler(self._log_handler) + logging.getLogger("replay").setLevel(logging.DEBUG) + + def _init_curses(self, stdscr) -> None: + self._stdscr = stdscr + curses.curs_set(0) + curses.cbreak() + curses.noecho() + stdscr.keypad(True) + stdscr.nodelay(True) + + # Initialize colors + curses.start_color() + curses.use_default_colors() + curses.init_pair(Color.DEBUG.value, 246, -1) + curses.init_pair(Color.YELLOW.value, curses.COLOR_YELLOW, -1) + curses.init_pair(Color.GREEN.value, curses.COLOR_GREEN, -1) + curses.init_pair(Color.RED.value, curses.COLOR_RED, -1) + curses.init_pair(Color.CYAN.value, curses.COLOR_CYAN, -1) + curses.init_pair(Color.BRIGHT_WHITE.value, curses.COLOR_WHITE, -1) + curses.init_pair(Color.ENGAGED.value, curses.COLOR_GREEN, curses.COLOR_GREEN) + curses.init_pair(Color.DISENGAGED.value, curses.COLOR_BLUE, curses.COLOR_BLUE) + + self._init_windows() + + def _init_windows(self) -> None: + self.max_height, self.max_width = self._stdscr.getmaxyx() + + # Title bar + self.windows[Win.TITLE] = curses.newwin(1, self.max_width, 0, 0) + self.windows[Win.TITLE].bkgd(' ', curses.A_REVERSE) + self.windows[Win.TITLE].addstr(0, 3, "openpilot replay") + + # Stats + self.windows[Win.STATS] = curses.newwin(2, self.max_width - 2 * BORDER_SIZE, 2, BORDER_SIZE) + + # Timeline + self.windows[Win.TIMELINE] = curses.newwin(4, self.max_width - 2 * BORDER_SIZE, 5, BORDER_SIZE) + + # Timeline description + self.windows[Win.TIMELINE_DESC] = curses.newwin(1, 100, 10, BORDER_SIZE) + + # Car state + self.windows[Win.CAR_STATE] = curses.newwin(3, 100, 12, BORDER_SIZE) + + # Download bar + self.windows[Win.DOWNLOAD_BAR] = curses.newwin(1, 100, 16, BORDER_SIZE) + + # Log window + log_height = self.max_height - 27 + if log_height > 4: + self.windows[Win.LOG_BORDER] = curses.newwin(log_height, self.max_width - 2 * (BORDER_SIZE - 1), 17, BORDER_SIZE - 1) + self.windows[Win.LOG_BORDER].box() + self.windows[Win.LOG] = curses.newwin(log_height - 2, self.max_width - 2 * BORDER_SIZE, 18, BORDER_SIZE) + self.windows[Win.LOG].scrollok(True) + + # Help window + if self.max_height >= 23: + self.windows[Win.HELP] = curses.newwin(5, self.max_width - 2 * BORDER_SIZE, self.max_height - 6, BORDER_SIZE) + elif self.max_height >= 17: + self.windows[Win.HELP] = curses.newwin(1, self.max_width - 2 * BORDER_SIZE, self.max_height - 1, BORDER_SIZE) + self.windows[Win.HELP].addstr(0, 0, "Expand screen vertically to list available commands") + + self._stdscr.refresh() + self._display_timeline_desc() + if self.max_height >= 23: + self._display_help() + self._update_summary() + self._update_timeline() + + for win in self.windows.values(): + if win: + win.noutrefresh() + curses.doupdate() + + def _add_str(self, win, text: str, color: Color = Color.DEFAULT, bold: bool = False) -> None: + attrs = 0 + if color != Color.DEFAULT: + attrs |= curses.color_pair(color.value) + if bold: + attrs |= curses.A_BOLD + try: + win.addstr(text, attrs) + except curses.error: + pass # Ignore write errors at edge of window + + def _display_help(self) -> None: + win = self.windows[Win.HELP] + if not win: + return + + for i, row in enumerate(KEYBOARD_SHORTCUTS): + win.move(i * 2, 0) + for key, desc in row: + win.attron(curses.A_REVERSE) + win.addstr(f" {key} ") + win.attroff(curses.A_REVERSE) + win.addstr(f" {desc} ") + win.refresh() + + def _display_timeline_desc(self) -> None: + win = self.windows[Win.TIMELINE_DESC] + if not win: + return + + indicators = [ + (Color.ENGAGED, " Engaged ", False), + (Color.DISENGAGED, " Disengaged ", False), + (Color.GREEN, " Info ", True), + (Color.YELLOW, " Warning ", True), + (Color.RED, " Critical ", True), + (Color.CYAN, " User Tag ", True), + ] + + for color, name, bold in indicators: + self._add_str(win, "__", color, bold) + self._add_str(win, name) + win.refresh() + + def _update_summary(self) -> None: + win = self.windows[Win.STATS] + if not win: + return + + route = self.replay.route + if route: + segments = route.segments + win.addstr(0, 0, f"Route: {self.replay._seg_mgr._route_name}, {len(segments)} segments") + win.addstr(1, 0, f"Car Fingerprint: {self.replay.car_fingerprint}") + win.refresh() + + def _update_status(self) -> None: + win = self.windows[Win.CAR_STATE] + if not win: + return + + win.erase() + + self._sm.update(0) + + # Status + status_text = "playing" if self.status == Status.PLAYING else "paused..." + status_color = Color.GREEN if self.status == Status.PLAYING else Color.YELLOW + win.addstr(0, 0, "STATUS: ") + self._add_str(win, status_text, status_color) + win.addstr(" ") + + # Time + cur_ts = self.replay.current_seconds + segment = int(cur_ts / 60) + win.addstr(0, 25, "TIME: ") + self._add_str(win, f"{cur_ts:.1f}s", Color.BRIGHT_WHITE, True) + win.addstr(f" - segment {segment}") + + # Speed + win.addstr(1, 0, "SPEED: ") + try: + v_ego = self._sm["carState"].vEgo + self._add_str(win, f"{v_ego:.2f}", Color.BRIGHT_WHITE) + except Exception: + self._add_str(win, "N/A", Color.YELLOW) + win.addstr(" m/s") + + # Playback speed + win.addstr(1, 25, "PLAYBACK: ") + self._add_str(win, f"{self.replay.speed:.1f}x", Color.BRIGHT_WHITE, True) + + # Timing stats + stats = self.replay.stats + buffer_ms = stats.time_buffer_ns / 1_000_000 + + # Color code the buffer based on how far ahead/behind we are + if buffer_ms >= 0: + buffer_color = Color.GREEN + elif buffer_ms >= -10: + buffer_color = Color.YELLOW + else: + buffer_color = Color.RED + + win.addstr(2, 0, "TIMING: buffer: ") + self._add_str(win, f"{buffer_ms:+.1f}ms", buffer_color) + win.addstr(f" | lags(30s): {stats.lag_count}") + if stats.worst_lag_ns < 0: + worst_ms = stats.worst_lag_ns / 1_000_000 + win.addstr(f" | worst: {worst_ms:.0f}ms") + + win.refresh() + + def _update_timeline(self) -> None: + win = self.windows[Win.TIMELINE] + if not win: + return + + width = self.max_width - 2 * BORDER_SIZE + win.erase() + + # Draw disengaged background + win.attron(curses.color_pair(Color.DISENGAGED.value)) + for row in [1, 2]: + win.move(row, 0) + win.addstr(" " * (width - 1)) + win.attroff(curses.color_pair(Color.DISENGAGED.value)) + + total_sec = self.replay.max_seconds - self.replay.min_seconds + if total_sec <= 0: + win.refresh() + return + + # Draw timeline entries + entries = self.replay.get_timeline() + for entry in entries: + start_pos = int((entry.start_time - self.replay.min_seconds) / total_sec * width) + end_pos = int((entry.end_time - self.replay.min_seconds) / total_sec * width) + start_pos = max(0, min(start_pos, width - 1)) + end_pos = max(0, min(end_pos, width - 1)) + + if entry.type == TimelineType.ENGAGED: + for row in [1, 2]: + win.chgat(row, start_pos, end_pos - start_pos + 1, curses.color_pair(Color.ENGAGED.value)) + elif entry.type == TimelineType.USER_BOOKMARK: + win.chgat(3, start_pos, end_pos - start_pos + 1, curses.color_pair(Color.CYAN.value)) + else: + color = Color.GREEN + if entry.type == TimelineType.ALERT_WARNING: + color = Color.YELLOW + elif entry.type == TimelineType.ALERT_CRITICAL: + color = Color.RED + try: + win.chgat(3, start_pos, end_pos - start_pos + 1, curses.color_pair(color.value)) + except curses.error: + pass + + # Draw current position + cur_pos = int((self.replay.current_seconds - self.replay.min_seconds) / total_sec * width) + cur_pos = max(0, min(cur_pos, width - 2)) + try: + win.attron(curses.color_pair(Color.BRIGHT_WHITE.value)) + win.addch(0, cur_pos, curses.ACS_VLINE) + win.addch(3, cur_pos, curses.ACS_VLINE) + win.attroff(curses.color_pair(Color.BRIGHT_WHITE.value)) + except curses.error: + pass + + win.refresh() + + def _update_progress_bar(self) -> None: + win = self.windows[Win.DOWNLOAD_BAR] + if not win: + return + + win.erase() + with self._lock: + if self._download_success and self._progress_cur < self._progress_total: + width = 35 + progress = self._progress_cur / self._progress_total + pos = int(width * progress) + bar = "=" * pos + ">" + " " * (width - pos) + win.addstr(0, 0, f"Downloading [{bar}] {int(progress * 100)}%") + win.refresh() + + def _log_message(self, msg: str, color: Color = Color.DEFAULT) -> None: + win = self.windows[Win.LOG] + if win: + self._add_str(win, msg + "\n", color) + win.refresh() + + def _pause_replay(self, pause: bool) -> None: + self.replay.pause(pause) + self.status = Status.PAUSED if pause else Status.PLAYING + + def _handle_key(self, key: int) -> bool: + if key == ord('q') or key == ord('Q'): + return False + + if key == ord('\n'): + # Pause and get seek input + self._pause_replay(True) + self._update_status() + curses.curs_set(1) + self._stdscr.nodelay(False) + + self._log_message("Waiting for input...", Color.YELLOW) + y = self.max_height - 9 + self._stdscr.move(y, BORDER_SIZE) + self._add_str(self._stdscr, "Enter seek request (seconds): ", Color.BRIGHT_WHITE, True) + self._stdscr.refresh() + + curses.echo() + try: + input_str = self._stdscr.getstr(y, BORDER_SIZE + 30, 10).decode('utf-8') + choice = int(input_str) + self._pause_replay(False) + self.replay.seek_to(choice, relative=False) + except (ValueError, curses.error): + pass + curses.noecho() + + self._stdscr.move(y, 0) + self._stdscr.clrtoeol() + self._stdscr.nodelay(True) + curses.curs_set(0) + self._stdscr.refresh() + + elif key == ord('+') or key == ord('='): + speed = self.replay.speed + for s in SPEED_ARRAY: + if s > speed: + self._log_message(f"playback speed: {s:.1f}x", Color.YELLOW) + self.replay.speed = s + break + + elif key == ord('-') or key == ord('_'): + speed = self.replay.speed + for s in reversed(SPEED_ARRAY): + if s < speed: + self._log_message(f"playback speed: {s:.1f}x", Color.YELLOW) + self.replay.speed = s + break + + elif key == ord('e'): + self.replay.seek_to_flag(FindFlag.NEXT_ENGAGEMENT) + elif key == ord('d'): + self.replay.seek_to_flag(FindFlag.NEXT_DISENGAGEMENT) + elif key == ord('t'): + self.replay.seek_to_flag(FindFlag.NEXT_USER_BOOKMARK) + elif key == ord('i'): + self.replay.seek_to_flag(FindFlag.NEXT_INFO) + elif key == ord('w'): + self.replay.seek_to_flag(FindFlag.NEXT_WARNING) + elif key == ord('c'): + self.replay.seek_to_flag(FindFlag.NEXT_CRITICAL) + elif key == ord('m'): + self.replay.seek_to(60, relative=True) + elif key == ord('M'): + self.replay.seek_to(-60, relative=True) + elif key == ord('s'): + self.replay.seek_to(10, relative=True) + elif key == ord('S'): + self.replay.seek_to(-10, relative=True) + elif key == ord(' '): + self._pause_replay(not self.replay.is_paused) + + return True + + def _main_loop(self, stdscr) -> int: + self._init_curses(stdscr) + + frame = 0 + while not self._exit: + key = stdscr.getch() + if not self._handle_key(key): + break + + if frame % 25 == 0: + # Check for terminal resize + new_h, new_w = stdscr.getmaxyx() + if new_h != self.max_height or new_w != self.max_width: + for win in self.windows.values(): + if win: + try: + del win + except Exception: + pass + stdscr.clear() + stdscr.refresh() + self._init_windows() + + self._update_summary() + + self._update_timeline() + self._update_status() + self._update_progress_bar() + + # Process logs + with self._lock: + for msg_type, msg in self._logs: + color = Color.DEFAULT + if msg_type == 0: # Debug + color = Color.DEBUG + elif msg_type == 1: # Warning + color = Color.YELLOW + elif msg_type == 2: # Critical + color = Color.RED + self._log_message(msg, color) + self._logs.clear() + + frame += 1 + time.sleep(0.05) # ~20 Hz + + return 0 + + def exec(self) -> int: + return curses.wrapper(self._main_loop) diff --git a/tools/replay/replay.py b/tools/replay/replay.py new file mode 100755 index 00000000000000..df62ae02738751 --- /dev/null +++ b/tools/replay/replay.py @@ -0,0 +1,657 @@ +#!/usr/bin/env python3 +import argparse +import logging +import resource +import sys +import threading +import time +from collections import deque +from collections.abc import Callable +from dataclasses import dataclass, field +from typing import Optional + +import cereal.messaging as messaging +from cereal.services import SERVICE_LIST +from openpilot.common.params import Params + +from openpilot.tools.replay.camera import CameraServer, CameraType +from openpilot.tools.replay.seg_mgr import SegmentManager, ReplayFlags +from openpilot.tools.replay.timeline import Timeline, FindFlag + +log = logging.getLogger("replay") + + +@dataclass +class ReplayStats: + """Tracks timing statistics for playback observability.""" + time_buffer_ns: int = 0 # Current time_diff (positive = ahead of schedule) + _lag_events: deque = field(default_factory=deque) # (timestamp, lag_ns) + _lag_threshold_ns: int = -10_000_000 # -10ms + _window_secs: float = 30.0 + + def record_timing(self, time_diff_ns: int) -> None: + """Record a timing measurement. Called from _publish_events.""" + self.time_buffer_ns = time_diff_ns + now = time.monotonic() + + # Record if it's a lag (behind schedule by more than threshold) + if time_diff_ns < self._lag_threshold_ns: + self._lag_events.append((now, time_diff_ns)) + + # Prune entries older than window + cutoff = now - self._window_secs + while self._lag_events and self._lag_events[0][0] < cutoff: + self._lag_events.popleft() + + @property + def lag_count(self) -> int: + """Number of lag events in the rolling window.""" + return len(self._lag_events) + + @property + def worst_lag_ns(self) -> int: + """Most negative time_diff in the rolling window (0 if none).""" + if not self._lag_events: + return 0 + return min(lag for _, lag in self._lag_events) + + +@dataclass +class BenchmarkStats: + """Tracks benchmark timeline events.""" + process_start_ts: int = 0 + timeline: list = field(default_factory=list) # [(timestamp_ns, description)] + + def record(self, description: str) -> None: + self.timeline.append((time.monotonic_ns(), description)) + +DEMO_ROUTE = "a2a0ccea32023010|2023-07-27--13-01-19" + + +class Replay: + def __init__(self, route: str, allow: Optional[list[str]] = None, block: Optional[list[str]] = None, + sm=None, flags: int = 0, data_dir: str = ""): + self._sm = sm + self._flags = flags + self._seg_mgr = SegmentManager(route, flags, data_dir) + + allow = allow or [] + block = block or [] + + if not (flags & ReplayFlags.ALL_SERVICES): + block.extend(["bookmarkButton", "uiDebug", "userBookmark"]) + + self._sockets: dict[str, bool] = {} # service name -> enabled + self._pm: Optional[messaging.PubMaster] = None + self._setup_services(allow, block) + self._setup_segment_manager(bool(allow) or bool(block)) + + self._timeline = Timeline() + self._camera_server: Optional[CameraServer] = None + + # Stream thread state + self._stream_thread: Optional[threading.Thread] = None + self._stream_lock = threading.Lock() + self._stream_cv = threading.Condition(self._stream_lock) + self._interrupt = threading.Event() + + self._user_paused = False + self._events_ready = False + self._exit = False + + self._current_segment = 0 + self._seeking_to = -1.0 + self._route_start_ts = 0 + self._cur_mono_time = 0 + self._min_seconds = 0.0 + self._max_seconds = 0.0 + self._speed = 1.0 + self._car_fingerprint = "" + + # Timing stats for observability + self._stats = ReplayStats() + + # Benchmark mode state + self._benchmark_stats = BenchmarkStats() + self._benchmark_stats.process_start_ts = time.monotonic_ns() + self._benchmark_done = False + self._benchmark_cv = threading.Condition() + + # Callbacks + self.on_segments_merged: Optional[Callable[[], None]] = None + self.on_seeking: Optional[Callable[[float], None]] = None + self.on_seeked_to: Optional[Callable[[float], None]] = None + self.on_qlog_loaded: Optional[Callable] = None + self._event_filter: Optional[Callable] = None + + def __del__(self): + if hasattr(self, '_stream_thread') and self._stream_thread is not None and self._stream_thread.is_alive(): + log.info("shutdown: in progress...") + self._interrupt_stream(lambda: setattr(self, '_exit', True) or False) + self._stream_thread.join() + log.info("shutdown: done") + + def _setup_services(self, allow: list[str], block: list[str]) -> None: + active_services = [] + + for name in SERVICE_LIST.keys(): + is_blocked = name in block + is_allowed = not allow or name in allow + if is_allowed and not is_blocked: + self._sockets[name] = True + active_services.append(name) + else: + self._sockets[name] = False + + log.info(f"active services: {', '.join(active_services)}") + if not self._sm: + self._pm = messaging.PubMaster(active_services) + + def _setup_segment_manager(self, has_filters: bool) -> None: + self._seg_mgr.set_callback(self._handle_segment_merge) + + @property + def route(self): + return self._seg_mgr.route + + @property + def is_paused(self) -> bool: + return self._user_paused + + @property + def current_seconds(self) -> float: + return (self._cur_mono_time - self._route_start_ts) / 1e9 + + @property + def route_start_nanos(self) -> int: + return self._route_start_ts + + @property + def min_seconds(self) -> float: + return self._min_seconds + + @property + def max_seconds(self) -> float: + return self._max_seconds + + @property + def speed(self) -> float: + return self._speed + + @speed.setter + def speed(self, value: float) -> None: + self._speed = max(0.1, min(8.0, value)) + + @property + def car_fingerprint(self) -> str: + return self._car_fingerprint + + @property + def stats(self) -> ReplayStats: + return self._stats + + @property + def segment_cache_limit(self) -> int: + return self._seg_mgr.segment_cache_limit + + @segment_cache_limit.setter + def segment_cache_limit(self, value: int) -> None: + self._seg_mgr.segment_cache_limit = max(5, value) + + def has_flag(self, flag: ReplayFlags) -> bool: + return bool(self._flags & flag) + + def set_loop(self, loop: bool) -> None: + if loop: + self._flags &= ~ReplayFlags.NO_LOOP + else: + self._flags |= ReplayFlags.NO_LOOP + + def loop(self) -> bool: + return not (self._flags & ReplayFlags.NO_LOOP) + + def get_timeline(self): + return self._timeline.get_entries() + + def find_alert_at_time(self, sec: float): + return self._timeline.find_alert_at_time(sec) + + def get_event_data(self): + return self._seg_mgr.get_event_data() + + def install_event_filter(self, filter_fn: Callable) -> None: + self._event_filter = filter_fn + + @property + def benchmark_stats(self) -> BenchmarkStats: + return self._benchmark_stats + + def wait_for_finished(self) -> None: + """Wait for benchmark mode to complete.""" + with self._benchmark_cv: + self._benchmark_cv.wait_for(lambda: self._benchmark_done) + + def load(self) -> bool: + log.info(f"loading route {self._seg_mgr._route_name}") + if not self._seg_mgr.load(): + return False + + segments = list(self._seg_mgr._segments.keys()) + if segments: + self._min_seconds = min(segments) * 60 + self._max_seconds = (max(segments) + 1) * 60 + + if self.has_flag(ReplayFlags.BENCHMARK): + self._benchmark_stats.record("route metadata loaded") + return True + + def start(self, seconds: int = 0) -> None: + self.seek_to(self._min_seconds + seconds, relative=False) + + def pause(self, pause: bool) -> None: + if self._user_paused != pause: + def update(): + log.info(f"{'paused...' if pause else 'resuming'} at {self.current_seconds:.2f} s") + self._user_paused = pause + return not pause + self._interrupt_stream(update) + + def seek_to_flag(self, flag: FindFlag) -> None: + next_time = self._timeline.find(self.current_seconds, flag) + if next_time is not None: + self.seek_to(next_time - 2, relative=False) # seek 2 seconds before + + def seek_to(self, seconds: float, relative: bool) -> None: + target_time = seconds + self.current_seconds if relative else seconds + target_time = max(0.0, target_time) + target_segment = int(target_time / 60) + + if not self._seg_mgr.has_segment(target_segment): + log.warning(f"Invalid seek to {target_time:.2f} s (segment {target_segment})") + return + + log.info(f"Seeking to {int(target_time)} s, segment {target_segment}") + if self.on_seeking: + self.on_seeking(target_time) + + def update(): + self._current_segment = target_segment + self._cur_mono_time = self._route_start_ts + int(target_time * 1e9) + self._seeking_to = target_time + return False + + self._interrupt_stream(update) + self._seg_mgr.set_current_segment(target_segment) + self._check_seek_progress() + + def _interrupt_stream(self, update_fn: Callable[[], bool]) -> None: + self._interrupt.set() + with self._stream_cv: + self._events_ready = update_fn() + if self._user_paused: + self._interrupt.set() + else: + self._interrupt.clear() + self._stream_cv.notify_all() + + def _check_seek_progress(self) -> None: + event_data = self._seg_mgr.get_event_data() + if not event_data.is_segment_loaded(self._current_segment): + return + + seek_to = self._seeking_to + self._seeking_to = -1.0 + if seek_to >= 0 and self.on_seeked_to: + self.on_seeked_to(seek_to) + + # Resume stream + self._interrupt_stream(lambda: True) + + def _handle_segment_merge(self) -> None: + if self._exit: + return + + event_data = self._seg_mgr.get_event_data() + if self._stream_thread is None and event_data.segments: + first_seg = min(event_data.segments.keys()) + self._start_stream(event_data.segments[first_seg]) + + if self.on_segments_merged: + self.on_segments_merged() + + self._interrupt_stream(lambda: False) + self._check_seek_progress() + + def _start_stream(self, segment) -> None: + events = segment.events + if not events: + return + + self._route_start_ts = events[0].logMonoTime + self._cur_mono_time = self._route_start_ts - 1 + + # Write CarParams + for evt in events: + if evt.which() == 'carParams': + self._car_fingerprint = evt.carParams.carFingerprint + try: + params = Params() + car_params_bytes = evt.carParams.as_builder().to_bytes() + params.put("CarParams", car_params_bytes) + params.put("CarParamsPersistent", car_params_bytes) + except Exception as e: + log.warning(f"failed to write CarParams: {e}") + break + + # Start camera server + if not self.has_flag(ReplayFlags.NO_VIPC): + camera_sizes = {} + for cam_name, cam_type in [('road', CameraType.ROAD), ('driver', CameraType.DRIVER), ('wide', CameraType.WIDE_ROAD)]: + if cam_name in segment.frame_readers: + fr = segment.frame_readers[cam_name] + camera_sizes[cam_type] = (fr.w, fr.h) + if camera_sizes: + self._camera_server = CameraServer(camera_sizes) + # Warm the cache before playback to prevent initial stutter + for cam_name in segment.frame_readers: + self._camera_server.warm_cache(segment.frame_readers[cam_name]) + + # Initialize timeline + self._timeline.initialize( + self._seg_mgr.route, + self._route_start_ts, + lambda lr: self.on_qlog_loaded(lr) if self.on_qlog_loaded else None + ) + + if self.has_flag(ReplayFlags.BENCHMARK): + self._benchmark_stats.record("streaming started") + + self._stream_thread = threading.Thread(target=self._stream_thread_fn, daemon=True) + self._stream_thread.start() + + def _stream_thread_fn(self) -> None: + benchmark_mode = self.has_flag(ReplayFlags.BENCHMARK) + benchmark_segment_start: Optional[float] = None + benchmark_start_segment = self._current_segment + + while True: + # Hold lock only while checking/updating shared state + with self._stream_lock: + self._stream_cv.wait_for(lambda: self._exit or (self._events_ready and not self._interrupt.is_set())) + if self._exit: + break + + event_data = self._seg_mgr.get_event_data() + events = event_data.events + + # Find first event after current time + first_idx = 0 + for i, evt in enumerate(events): + if evt.logMonoTime > self._cur_mono_time: + first_idx = i + break + else: + log.info("waiting for events...") + self._events_ready = False + continue + + if benchmark_mode and benchmark_segment_start is None: + benchmark_segment_start = time.monotonic() + + # Publish WITHOUT holding lock - allows UI to interrupt quickly + prev_segment = self._current_segment + last_idx = self._publish_events(events, first_idx) + + # Wait for camera frames to be sent + if self._camera_server: + self._camera_server.wait_for_sent() + + # Track segment completion for benchmark + if benchmark_mode and self._current_segment != prev_segment: + elapsed_ms = (time.monotonic() - benchmark_segment_start) * 1000 + realtime_ms = 60 * 1000 # 60 seconds per segment + multiplier = realtime_ms / elapsed_ms if elapsed_ms > 0 else 0 + self._benchmark_stats.record(f"segment {prev_segment} done publishing ({elapsed_ms:.0f} ms, {multiplier:.0f}x realtime)") + benchmark_segment_start = time.monotonic() + + # In benchmark mode, exit after first segment + if prev_segment == benchmark_start_segment: + self._benchmark_stats.record("benchmark done") + with self._benchmark_cv: + self._benchmark_done = True + self._benchmark_cv.notify_all() + break + + # Handle loop + if last_idx >= len(events) and not self.has_flag(ReplayFlags.NO_LOOP): + segments = list(self._seg_mgr._segments.keys()) + if segments and event_data.is_segment_loaded(max(segments)): + log.info("reaches the end of route, restart from beginning") + self.seek_to(self._min_seconds, relative=False) + + def _publish_events(self, events: list, first_idx: int) -> int: + evt_start_ts = self._cur_mono_time + loop_start_ts = time.monotonic_ns() + prev_speed = self._speed + benchmark_mode = self.has_flag(ReplayFlags.BENCHMARK) + + idx = first_idx + while idx < len(events) and not self._interrupt.is_set(): + evt = events[idx] + + # Update current segment + segment = int((evt.logMonoTime - self._route_start_ts) / 1e9 / 60) + if self._current_segment != segment: + self._current_segment = segment + self._seg_mgr.set_current_segment(segment) + # In benchmark mode, return after segment change to allow tracking + if benchmark_mode: + return idx + + self._cur_mono_time = evt.logMonoTime + + # Check if service is enabled + which = evt.which() + if not self._sockets.get(which, False): + idx += 1 + continue + + # Skip timing in benchmark mode for maximum throughput + if not benchmark_mode: + # Timing + current_nanos = time.monotonic_ns() + time_diff = (evt.logMonoTime - evt_start_ts) / self._speed - (current_nanos - loop_start_ts) + + # Record timing stats for observability + self._stats.record_timing(int(time_diff)) + + # Reset timing if needed + if time_diff < -1e9 or time_diff >= 1e9 or self._speed != prev_speed: + evt_start_ts = evt.logMonoTime + loop_start_ts = current_nanos + prev_speed = self._speed + elif time_diff > 0: + # Interruptible sleep + wait_secs = time_diff / 1e9 + if self._interrupt.wait(timeout=wait_secs): + break # Interrupted + + if self._interrupt.is_set(): + break + + # Publish message or frame + if which in ('roadEncodeIdx', 'driverEncodeIdx', 'wideRoadEncodeIdx'): + if self._camera_server: + self._publish_frame(evt, which) + else: + self._publish_message(evt) + + idx += 1 + + return idx + + def _publish_message(self, evt) -> None: + if self._event_filter and self._event_filter(evt): + return + + which = evt.which() + if not self._sm: + try: + msg_bytes = evt.as_builder().to_bytes() + self._pm.send(which, msg_bytes) + except Exception as e: + log.warning(f"stop publishing {which} due to error: {e}") + self._sockets[which] = False + + def _publish_frame(self, evt, which: str) -> None: + cam_type = { + 'roadEncodeIdx': CameraType.ROAD, + 'driverEncodeIdx': CameraType.DRIVER, + 'wideRoadEncodeIdx': CameraType.WIDE_ROAD, + }.get(which) + + if cam_type is None: + return + + # Check if camera is enabled + if cam_type == CameraType.DRIVER and not self.has_flag(ReplayFlags.DCAM): + return + if cam_type == CameraType.WIDE_ROAD and not self.has_flag(ReplayFlags.ECAM): + return + + # Get frame reader for this segment + # Note: eidx.segmentId is the local frame index, not the segment number + # The segment number comes from the current playback position + event_data = self._seg_mgr.get_event_data() + + if self._current_segment not in event_data.segments: + return + + seg_data = event_data.segments[self._current_segment] + cam_name = {CameraType.ROAD: 'road', CameraType.DRIVER: 'driver', CameraType.WIDE_ROAD: 'wide'}[cam_type] + if cam_name not in seg_data.frame_readers: + return + + fr = seg_data.frame_readers[cam_name] + if self._speed > 1.0: + self._camera_server.wait_for_sent() + self._camera_server.push_frame(cam_type, fr, evt) + + +def main(): + # Increase file descriptor limit on macOS + if sys.platform == 'darwin': + try: + resource.setrlimit(resource.RLIMIT_NOFILE, (1024, 1024)) + except Exception: + pass + + parser = argparse.ArgumentParser(description='openpilot replay tool') + parser.add_argument('route', nargs='?', default='', help='Route to replay') + parser.add_argument('-a', '--allow', type=str, default='', help='Whitelist of services (comma-separated)') + parser.add_argument('-b', '--block', type=str, default='', help='Blacklist of services (comma-separated)') + parser.add_argument('-c', '--buffer', type=int, default=-1, help='Number of segments to buffer in memory') + parser.add_argument('-s', '--start', type=int, default=0, help='Start from ') + parser.add_argument('-x', '--playback', type=float, default=-1, help='Playback speed') + parser.add_argument('-d', '--data_dir', type=str, default='', help='Local directory with routes') + parser.add_argument('-p', '--prefix', type=str, default='', help='OPENPILOT_PREFIX') + parser.add_argument('--demo', action='store_true', help='Use demo route') + parser.add_argument('--dcam', action='store_true', help='Load driver camera') + parser.add_argument('--ecam', action='store_true', help='Load wide road camera') + parser.add_argument('--no-loop', action='store_true', help='Stop at end of route') + parser.add_argument('--qcam', action='store_true', help='Load qcamera') + parser.add_argument('--no-vipc', action='store_true', help='Do not output video') + parser.add_argument('--all', action='store_true', help='Output all messages') + parser.add_argument('--benchmark', action='store_true', help='Run in benchmark mode (process all events then exit with stats)') + parser.add_argument('--headless', action='store_true', help='Run without UI') + + args = parser.parse_args() + + # Determine route + route = args.route + if args.demo: + route = DEMO_ROUTE + if not route: + print("No route provided. Use --help for usage information.") + return 1 + + # Parse flags + flags = ReplayFlags.NONE + if args.dcam: + flags |= ReplayFlags.DCAM + if args.ecam: + flags |= ReplayFlags.ECAM + if args.no_loop: + flags |= ReplayFlags.NO_LOOP + if args.qcam: + flags |= ReplayFlags.QCAMERA + if args.no_vipc: + flags |= ReplayFlags.NO_VIPC + if args.all: + flags |= ReplayFlags.ALL_SERVICES + if args.benchmark: + flags |= ReplayFlags.BENCHMARK + + # Parse allow/block lists + allow = [s.strip() for s in args.allow.split(',') if s.strip()] + block = [s.strip() for s in args.block.split(',') if s.strip()] + + # Set prefix if provided + if args.prefix: + import os + os.environ['OPENPILOT_PREFIX'] = args.prefix + + # Create replay instance + replay = Replay( + route=route, + allow=allow, + block=block, + flags=flags, + data_dir=args.data_dir + ) + + if args.buffer > 0: + replay.segment_cache_limit = args.buffer + + if args.playback > 0: + replay.speed = max(0.2, min(8.0, args.playback)) + + if not replay.load(): + return 1 + + if args.benchmark: + replay.start(args.start) + replay.wait_for_finished() + + stats = replay.benchmark_stats + process_start = stats.process_start_ts + + print("\n===== REPLAY BENCHMARK RESULTS =====") + print(f"Route: {replay.route.name}") + print("\nTIMELINE:") + print(" t=0 ms process start") + for ts, event in stats.timeline: + ms = (ts - process_start) / 1e6 + padding = " " * max(1, 8 - len(str(int(ms)))) + print(f" t={ms:.0f} ms{padding}{event}") + + return 0 + + replay.start(args.start) + + if args.headless: + try: + while True: + time.sleep(5) + print(f"replay: {replay.current_seconds:.1f}s / {replay.max_seconds:.1f}s") + except KeyboardInterrupt: + pass + return 0 + + from openpilot.tools.replay.consoleui import ConsoleUI + console_ui = ConsoleUI(replay) + return console_ui.exec() + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/tools/replay/seg_mgr.py b/tools/replay/seg_mgr.py new file mode 100755 index 00000000000000..cec74d3ee0325b --- /dev/null +++ b/tools/replay/seg_mgr.py @@ -0,0 +1,290 @@ +#!/usr/bin/env python3 +import logging +import threading +from collections.abc import Callable +from dataclasses import dataclass, field +from enum import Enum, IntFlag, auto +from typing import Optional + +from openpilot.selfdrive.test.process_replay.migration import migrate_all +from openpilot.tools.lib.logreader import LogReader +from openpilot.tools.lib.route import Route, Segment +from openpilot.tools.lib.framereader import FrameReader + +log = logging.getLogger("replay") + +MIN_SEGMENTS_CACHE = 5 + + +class ReplayFlags(IntFlag): + NONE = 0x0000 + DCAM = 0x0002 + ECAM = 0x0004 + NO_LOOP = 0x0010 + QCAMERA = 0x0040 + NO_VIPC = 0x0400 + ALL_SERVICES = 0x0800 + BENCHMARK = 0x1000 + + +class LoadState(Enum): + LOADING = auto() + LOADED = auto() + FAILED = auto() + + +@dataclass +class SegmentData: + seg_num: int + segment: Segment + events: list = field(default_factory=list) + frame_readers: dict = field(default_factory=dict) # CameraType -> FrameReader + load_state: LoadState = LoadState.LOADING + + +@dataclass +class EventData: + events: list = field(default_factory=list) + segments: dict = field(default_factory=dict) # seg_num -> SegmentData + + def is_segment_loaded(self, n: int) -> bool: + return n in self.segments + + +class SegmentManager: + def __init__(self, route_name: str, flags: int = 0, data_dir: str = ""): + self._flags = flags + self._data_dir = data_dir if data_dir else None + self._route_name = route_name + self._route: Optional[Route] = None + + self._filters: list[bool] = [] + self._segments: dict[int, Optional[SegmentData]] = {} + self._event_data = EventData() + self._merged_segments: set[int] = set() + + self._lock = threading.Lock() + self._cv = threading.Condition(self._lock) + self._thread: Optional[threading.Thread] = None + self._cur_seg_num = -1 + self._needs_update = False + self._exit = False + + self.segment_cache_limit = MIN_SEGMENTS_CACHE + self._on_segment_merged_callback: Optional[Callable[[], None]] = None + + def __del__(self): + if hasattr(self, '_cv'): + with self._cv: + self._exit = True + self._cv.notify_all() + if hasattr(self, '_thread') and self._thread is not None and self._thread.is_alive(): + self._thread.join() + + @property + def route(self) -> Optional[Route]: + return self._route + + def load(self) -> bool: + try: + self._route = Route(self._route_name, data_dir=self._data_dir) + except Exception: + log.exception(f"failed to load route: {self._route_name}") + return False + + # Initialize segment slots for all available segments + for seg in self._route.segments: + seg_num = seg.name.segment_num + if seg.log_path or seg.qlog_path: + self._segments[seg_num] = None + + if not self._segments: + log.error(f"no valid segments in route: {self._route_name}") + return False + + log.info(f"loaded route {self._route_name} with {len(self._segments)} valid segments") + self._thread = threading.Thread(target=self._manage_segment_cache, daemon=True) + self._thread.start() + return True + + def set_current_segment(self, seg_num: int) -> None: + with self._cv: + if self._cur_seg_num == seg_num: + return + self._cur_seg_num = seg_num + self._needs_update = True + self._cv.notify_all() + + def set_callback(self, callback: Callable[[], None]) -> None: + self._on_segment_merged_callback = callback + + def set_filters(self, filters: list[bool]) -> None: + self._filters = filters + + def get_event_data(self) -> EventData: + with self._lock: + return self._event_data + + def has_segment(self, n: int) -> bool: + return n in self._segments + + def _manage_segment_cache(self) -> None: + while True: + with self._cv: + self._cv.wait_for(lambda: self._exit or self._needs_update) + if self._exit: + break + + self._needs_update = False + seg_nums = sorted(self._segments.keys()) + if not seg_nums: + continue + + # Find current segment index + cur_idx = 0 + for i, n in enumerate(seg_nums): + if n >= self._cur_seg_num: + cur_idx = i + break + + # Calculate range to load + half_cache = self.segment_cache_limit // 2 + begin_idx = max(0, cur_idx - half_cache) + end_idx = min(len(seg_nums), begin_idx + self.segment_cache_limit) + begin_idx = max(0, end_idx - self.segment_cache_limit) + + range_seg_nums = seg_nums[begin_idx:end_idx] + + # Load segments in range (outside lock) + self._load_segments_in_range(range_seg_nums, self._cur_seg_num) + merged = self._merge_segments(range_seg_nums) + + # Free segments outside range + with self._lock: + for seg_num in list(self._segments.keys()): + if seg_num not in range_seg_nums: + self._segments[seg_num] = None + + if merged and self._on_segment_merged_callback: + self._on_segment_merged_callback() + + def _load_segments_in_range(self, seg_nums: list[int], cur_seg_num: int) -> bool: + """Load segments in range. Returns True if any segment was loaded.""" + # Load forward from current, then backward + forward = [n for n in seg_nums if n >= cur_seg_num] + backward = [n for n in seg_nums if n < cur_seg_num][::-1] + loaded_any = False + + for seg_num in forward + backward: + with self._lock: + if self._exit: + return loaded_any + if self._segments.get(seg_num) is not None: + continue + + # Load segment (blocking - downloads and parses) + log.info(f"loading segment {seg_num}...") + seg_data = self._load_segment(seg_num) + with self._cv: + self._segments[seg_num] = seg_data + self._needs_update = True + self._cv.notify_all() + loaded_any = True + log.info(f"segment {seg_num} loaded with {len(seg_data.events)} events") + + # Only load one segment at a time to be responsive + return loaded_any + + return loaded_any + + def _load_segment(self, seg_num: int) -> SegmentData: + # Find the segment object from route + segment = None + for seg in self._route.segments: + if seg.name.segment_num == seg_num: + segment = seg + break + + if segment is None: + return SegmentData(seg_num=seg_num, segment=None, load_state=LoadState.FAILED) + + seg_data = SegmentData(seg_num=seg_num, segment=segment) + + # Load log events + log_path = segment.log_path or segment.qlog_path + if log_path: + try: + lr = LogReader(log_path) + # Apply schema migrations + events = list(migrate_all(lr)) + seg_data.events = sorted(events, key=lambda x: x.logMonoTime) + except Exception as e: + log.warning(f"failed to load log for segment {seg_num}: {e}") + seg_data.load_state = LoadState.FAILED + return seg_data + + # Load frame readers based on flags + # VisionIPC expects NV12 format + try: + # cache_size=90 holds 3 GOPs (30 frames each) to stay ahead at 8x speed + if segment.camera_path and not (self._flags & ReplayFlags.NO_VIPC): + if not (self._flags & ReplayFlags.QCAMERA): + seg_data.frame_readers['road'] = FrameReader(segment.camera_path, pix_fmt='nv12', cache_size=90) + + if segment.dcamera_path and (self._flags & ReplayFlags.DCAM): + seg_data.frame_readers['driver'] = FrameReader(segment.dcamera_path, pix_fmt='nv12', cache_size=90) + + if segment.ecamera_path and (self._flags & ReplayFlags.ECAM): + seg_data.frame_readers['wide'] = FrameReader(segment.ecamera_path, pix_fmt='nv12', cache_size=90) + + if segment.qcamera_path and (self._flags & ReplayFlags.QCAMERA): + seg_data.frame_readers['qcam'] = FrameReader(segment.qcamera_path, pix_fmt='nv12', cache_size=90) + except Exception as e: + log.warning(f"failed to load frames for segment {seg_num}: {e}") + # Don't fail the whole segment, just skip frames + + seg_data.load_state = LoadState.LOADED + return seg_data + + def _merge_segments(self, seg_nums: list[int]) -> bool: + segments_to_merge = set() + total_events = [] + + with self._lock: + for seg_num in seg_nums: + seg_data = self._segments.get(seg_num) + if seg_data and seg_data.load_state == LoadState.LOADED: + segments_to_merge.add(seg_num) + + if segments_to_merge == self._merged_segments: + return False + + # Merge events from all loaded segments + merged_event_data = EventData() + for seg_num in sorted(segments_to_merge): + with self._lock: + seg_data = self._segments.get(seg_num) + if seg_data is None: + continue + + events = seg_data.events + if not events: + continue + + # Skip initData if present (first event) + start_idx = 0 + if events and events[0].which() == 'initData': + start_idx = 1 + + total_events.extend(events[start_idx:]) + merged_event_data.segments[seg_num] = seg_data + + # Sort all events by time + merged_event_data.events = sorted(total_events, key=lambda x: x.logMonoTime) + + with self._lock: + self._event_data = merged_event_data + self._merged_segments = segments_to_merge + + log.debug(f"merged segments: {sorted(segments_to_merge)}") + return True diff --git a/tools/replay/timeline.py b/tools/replay/timeline.py new file mode 100755 index 00000000000000..51e8853c458821 --- /dev/null +++ b/tools/replay/timeline.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python3 +import threading +from collections.abc import Callable +from dataclasses import dataclass +from enum import Enum, auto +from typing import Optional + +from openpilot.tools.lib.logreader import LogReader +from openpilot.selfdrive.test.process_replay.migration import migrate_all + + +class TimelineType(Enum): + NONE = auto() + ENGAGED = auto() + ALERT_INFO = auto() + ALERT_WARNING = auto() + ALERT_CRITICAL = auto() + USER_BOOKMARK = auto() + + +class FindFlag(Enum): + NEXT_ENGAGEMENT = auto() + NEXT_DISENGAGEMENT = auto() + NEXT_USER_BOOKMARK = auto() + NEXT_INFO = auto() + NEXT_WARNING = auto() + NEXT_CRITICAL = auto() + + +@dataclass +class TimelineEntry: + start_time: float + end_time: float + type: TimelineType + text1: str = "" + text2: str = "" + + +class Timeline: + def __init__(self): + self._entries: list[TimelineEntry] = [] + self._lock = threading.Lock() + self._thread: Optional[threading.Thread] = None + self._should_exit = threading.Event() + + def __del__(self): + self._should_exit.set() + if self._thread is not None and self._thread.is_alive(): + self._thread.join() + + def initialize(self, route, route_start_ts: int, + callback: Callable[[LogReader], None]) -> None: + self._thread = threading.Thread( + target=self._build_timeline, + args=(route, route_start_ts, callback), + daemon=True + ) + self._thread.start() + + def find(self, cur_ts: float, flag: FindFlag) -> Optional[float]: + with self._lock: + entries = list(self._entries) + + for entry in entries: + if entry.type == TimelineType.ENGAGED: + if flag == FindFlag.NEXT_ENGAGEMENT and entry.start_time > cur_ts: + return entry.start_time + elif flag == FindFlag.NEXT_DISENGAGEMENT and entry.end_time > cur_ts: + return entry.end_time + elif entry.start_time > cur_ts: + if (flag == FindFlag.NEXT_USER_BOOKMARK and entry.type == TimelineType.USER_BOOKMARK) or \ + (flag == FindFlag.NEXT_INFO and entry.type == TimelineType.ALERT_INFO) or \ + (flag == FindFlag.NEXT_WARNING and entry.type == TimelineType.ALERT_WARNING) or \ + (flag == FindFlag.NEXT_CRITICAL and entry.type == TimelineType.ALERT_CRITICAL): + return entry.start_time + return None + + def find_alert_at_time(self, target_time: float) -> Optional[TimelineEntry]: + with self._lock: + entries = list(self._entries) + + for entry in entries: + if entry.start_time > target_time: + break + if entry.end_time >= target_time and entry.type in ( + TimelineType.ALERT_INFO, TimelineType.ALERT_WARNING, TimelineType.ALERT_CRITICAL): + return entry + return None + + def get_entries(self) -> list[TimelineEntry]: + with self._lock: + return list(self._entries) + + def _build_timeline(self, route, route_start_ts: int, + callback: Callable[[LogReader], None]) -> None: + current_engaged_idx: Optional[int] = None + current_alert_idx: Optional[int] = None + staging_entries: list[TimelineEntry] = [] + + for segment in route.segments: + if self._should_exit.is_set(): + break + + qlog_path = segment.qlog_path + if qlog_path is None: + continue + + try: + lr = LogReader(qlog_path) + except Exception: + continue + + for msg in migrate_all(lr): + if self._should_exit.is_set(): + break + + seconds = (msg.logMonoTime - route_start_ts) / 1e9 + + if msg.which() == 'selfdriveState': + ss = msg.selfdriveState + current_engaged_idx = self._update_engagement_status( + ss.enabled, current_engaged_idx, seconds, staging_entries) + current_alert_idx = self._update_alert_status( + ss.alertSize, ss.alertStatus, ss.alertText1, ss.alertText2, + current_alert_idx, seconds, staging_entries) + elif msg.which() == 'userBookmark': + staging_entries.append(TimelineEntry( + start_time=seconds, + end_time=seconds, + type=TimelineType.USER_BOOKMARK + )) + + # Sort and update the timeline entries after each segment + sorted_entries = sorted(staging_entries, key=lambda e: e.start_time) + with self._lock: + self._entries = sorted_entries + + callback(lr) + + def _update_engagement_status(self, enabled: bool, idx: Optional[int], seconds: float, + entries: list[TimelineEntry]) -> Optional[int]: + if idx is not None: + entries[idx].end_time = seconds + + if enabled: + if idx is None: + idx = len(entries) + entries.append(TimelineEntry( + start_time=seconds, + end_time=seconds, + type=TimelineType.ENGAGED + )) + else: + idx = None + return idx + + def _update_alert_status(self, alert_size, alert_status, text1: str, text2: str, + idx: Optional[int], seconds: float, + entries: list[TimelineEntry]) -> Optional[int]: + # Map alertStatus enum to TimelineType + status_map = { + 'normal': TimelineType.ALERT_INFO, + 'userPrompt': TimelineType.ALERT_WARNING, + 'critical': TimelineType.ALERT_CRITICAL, + } + + entry = entries[idx] if idx is not None else None + if entry is not None: + entry.end_time = seconds + + # Check if there's an active alert (alertSize != NONE means alertSize > 0) + # alertSize is an enum: none=0, small=1, mid=2, full=3 + if str(alert_size) != 'none': + status_str = str(alert_status) + alert_type = status_map.get(status_str, TimelineType.ALERT_INFO) + + if entry is None or entry.type != alert_type or entry.text1 != text1 or entry.text2 != text2: + idx = len(entries) + entries.append(TimelineEntry( + start_time=seconds, + end_time=seconds, + type=alert_type, + text1=text1, + text2=text2 + )) + else: + idx = None + return idx