Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,25 @@ Once you have installed the `robot-log-visualizer` you can run it from the termi
You can navigate the dataset thanks to the slider or by pressing `Ctrl-f` and `Ctrl-b` to move
forward and backward.

To pre-load a dataset on startup, pass it as an argument:

```bash
robot-log-visualizer /path/to/dataset.mat
```

If you saved a snapshot of your favourite layout, you can restore it right away:

```bash
robot-log-visualizer --snapshot /path/to/view.json
```

You can also combine both parameters; in that case the dataset argument is loaded first and then
the snapshot is applied:

```bash
robot-log-visualizer /path/to/dataset.mat --snapshot /path/to/view.json
```

> [!IMPORTANT]
> `robot-log-visualizer` only supports reading `.mat` file [version 7.3 or newer](https://www.mathworks.com/help/matlab/import_export/mat-file-versions.html).

Expand Down
36 changes: 35 additions & 1 deletion robot_log_visualizer/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
# This software may be modified and distributed under the terms of the
# Released under the terms of the BSD 3-Clause License

import argparse
import pathlib
import sys
from typing import Optional, Sequence

# GUI
from qtpy.QtWidgets import QApplication
Expand All @@ -14,7 +17,28 @@
from robot_log_visualizer.robot_visualizer.meshcat_provider import MeshcatProvider


def main():
def _parse_arguments(argv: Sequence[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Robot Log Visualizer",
)
parser.add_argument(
"dataset",
nargs="?",
help="Path to a MAT dataset to load on startup.",
)
parser.add_argument(
"-s",
"--snapshot",
help="Path to a view snapshot (.json) to restore on startup.",
)

return parser.parse_args(argv)


def main(argv: Optional[Sequence[str]] = None):
parsed_argv = list(argv) if argv is not None else sys.argv[1:]
args = _parse_arguments(parsed_argv)

thread_periods = {
"meshcat_provider": 0.03,
"signal_provider": 0.03,
Expand All @@ -36,6 +60,16 @@ def main():
# show the main window
gui.show()

if args.dataset:
dataset_path = pathlib.Path(args.dataset).expanduser()
if not gui._load_mat_file(str(dataset_path), quiet=False): # noqa: SLF001
print(f"Failed to load dataset '{dataset_path}'.", file=sys.stderr)

if args.snapshot:
snapshot_path = pathlib.Path(args.snapshot).expanduser()
if not gui.load_view_snapshot_from_path(snapshot_path):
print(f"Failed to load snapshot '{snapshot_path}'.", file=sys.stderr)

exec_method = getattr(app, "exec", None)
if exec_method is None:
exec_method = app.exec_
Expand Down
205 changes: 191 additions & 14 deletions robot_log_visualizer/plotter/pyqtgraph_viewer_canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from __future__ import annotations

from typing import Dict, Iterable, Sequence, Tuple
from typing import Any, Dict, Iterable, List, Sequence, Tuple

import numpy as np
import pyqtgraph as pg # type: ignore
Expand Down Expand Up @@ -57,6 +57,7 @@ def __init__(
self._curves: Dict[str, pg.PlotDataItem] = {}
self._annotations: Dict[Point, pg.TextItem] = {}
self._markers: Dict[Point, pg.ScatterPlotItem] = {}
self._annotation_sources: Dict[Point, str] = {}
self._palette: Iterable = ColorPalette()

# UI set‑up
Expand Down Expand Up @@ -85,21 +86,23 @@ def set_signal_provider(self, signal_provider) -> None:
self._update_realtime_curves
)

def update_plots(self, paths: Sequence[Path], legends: Sequence[Legend]) -> None:
def update_plots(
self, paths: Sequence[Path], legends: Sequence[Legend]
) -> List[str]:
"""Synchronise plots with the *paths* list.

New items are added, disappeared items removed. Existing ones are
left untouched to avoid flicker.
"""
if self._signal_provider is None:
return
return []

# For real-time provider, update the set of selected signals to buffer
if self._signal_provider.provider_type == ProviderType.REALTIME:
selected_keys = ["::".join(path) for path in paths]
self._signal_provider.add_signals_to_buffer(selected_keys)

self._add_missing_curves(paths, legends)
missing_paths = self._add_missing_curves(paths, legends)
self._remove_obsolete_curves(paths)

# Set the X axis range based on the provider type
Expand All @@ -117,6 +120,8 @@ def update_plots(self, paths: Sequence[Path], legends: Sequence[Legend]) -> None
self._signal_provider.end_time - self._signal_provider.initial_time,
)

return missing_paths

# The following trio is wired to whoever controls the replay/stream
def pause_animation(self) -> None: # noqa: D401
"""Pause the vertical‑line animation."""
Expand Down Expand Up @@ -173,23 +178,46 @@ def _connect_signals(self) -> None:

def _add_missing_curves(
self, paths: Sequence[Path], legends: Sequence[Legend]
) -> None:
) -> List[str]:
"""Plot curves that are present in *paths* but not yet on screen."""

missing: List[str] = []
for path, legend in zip(paths, legends):
key = "/".join(path)
if key in self._curves:
continue

# Drill down to the data array
data = self._signal_provider.data
for subkey in path[:-1]:
data = data[subkey]
try:
y = data["data"][:, int(path[-1])]
except (IndexError, ValueError): # scalar variable
y = data["data"][:]
data = self._signal_provider.data
for subkey in path[:-1]:
data = data[subkey]

data_array = data["data"]
timestamps = data["timestamps"]
except (KeyError, TypeError):
missing.append(key)
continue

try:
y = data_array[:, int(path[-1])]
except (
IndexError,
ValueError,
TypeError,
): # scalar variable or invalid index
try:
y = data_array[:]
except Exception:
missing.append(key)
continue

try:
x = timestamps - self._signal_provider.initial_time
except Exception:
missing.append(key)
continue

x = data["timestamps"] - self._signal_provider.initial_time
palette_color = next(self._palette)
pen = pg.mkPen(palette_color.as_hex(), width=2)

Expand All @@ -201,12 +229,21 @@ def _add_missing_curves(
symbol=None,
)

return missing

def _remove_obsolete_curves(self, paths: Sequence[Path]) -> None:
"""Delete curves that disappeared from *paths*."""
valid = {"/".join(p) for p in paths}
for key in [k for k in self._curves if k not in valid]:
self._plot.removeItem(self._curves.pop(key))

# Remove annotations associated to the deleted curve
orphan_points = [
pt for pt, src in self._annotation_sources.items() if src == key
]
for point in orphan_points:
self._deselect(point)

def _update_vline(self) -> None:
"""Move the vertical line to ``current_time``."""
if self._signal_provider is None:
Expand Down Expand Up @@ -279,10 +316,11 @@ def _on_mouse_click(self, event) -> None: # noqa: N802
self._deselect(candidate)
else:
assert nearest_curve is not None # mypy‑friendly
self._select(candidate, nearest_curve.opts["pen"])
self._select(candidate, nearest_curve)

def _select(self, pt: Point, pen: pg.QtGui.QPen) -> None:
def _select(self, pt: Point, curve: pg.PlotDataItem) -> None:
"""Add label + circle marker on *pt*."""
pen = curve.opts["pen"]
x_span = np.diff(self._plot.viewRange()[0])[0]
y_span = np.diff(self._plot.viewRange()[1])[0]
x_prec = max(0, int(-np.log10(max(x_span, 1e-12))) + 2)
Expand Down Expand Up @@ -313,14 +351,153 @@ def _select(self, pt: Point, pen: pg.QtGui.QPen) -> None:
self._plot.addItem(marker)
self._markers[pt] = marker

curve_key = self._curve_key(curve)
if curve_key is not None:
self._annotation_sources[pt] = curve_key

def _deselect(self, pt: Point) -> None:
"""Remove annotation + marker on *pt*."""
self._plot.removeItem(self._annotations.pop(pt))
self._plot.removeItem(self._markers.pop(pt))
self._annotation_sources.pop(pt, None)

def clear_selections(self) -> None: # noqa: D401
"""Remove **all** annotations and markers."""
for item in (*self._annotations.values(), *self._markers.values()):
self._plot.removeItem(item)
self._annotations.clear()
self._markers.clear()
self._annotation_sources.clear()

def clear_curves(self) -> None:
"""Remove every plotted curve and related markers."""
for key in list(self._curves.keys()):
self._plot.removeItem(self._curves.pop(key))
self.clear_selections()

def capture_state(self) -> Dict[str, Any]:
"""Return a snapshot of the current canvas configuration."""

annotations: List[Dict[str, Any]] = []
for pt, label in self._annotations.items():
source = self._annotation_sources.get(pt)
if source is None:
continue
annotations.append(
{
"path": source,
"point": [float(pt[0]), float(pt[1])],
"label": label.toPlainText(),
}
)

curve_meta: Dict[str, Dict[str, Any]] = {}
for key, curve in self._curves.items():
pen_info: Dict[str, Any] = {}
pen = curve.opts.get("pen")
if pen is not None:
qcol = pen.color()
pen_info["color"] = (
f"#{qcol.red():02x}{qcol.green():02x}{qcol.blue():02x}"
)
pen_info["width"] = pen.width()
curve_meta[key] = {
"label": curve.opts.get("name", ""),
"pen": pen_info,
}

return {
"curves": curve_meta,
"view_range": self._plot.viewRange(),
"legend_visible": bool(self._plot.plotItem.legend.isVisible()),
"annotations": annotations,
}

def apply_view_range(self, view_range: Sequence[Sequence[float]]) -> None:
"""Restore the axes limits from a snapshot."""

if len(view_range) != 2:
return

x_range, y_range = view_range
if len(x_range) == 2:
self._plot.setXRange(float(x_range[0]), float(x_range[1]), padding=0)
if len(y_range) == 2:
self._plot.setYRange(float(y_range[0]), float(y_range[1]), padding=0)

def set_legend_visible(self, visible: bool) -> None:
"""Toggle legend visibility."""

legend = getattr(self._plot.plotItem, "legend", None)
if legend is not None:
legend.setVisible(bool(visible))

def restore_annotations(self, annotations: Sequence[Dict[str, Any]]) -> List[str]:
"""Re-create selection markers from saved data.

Returns:
List of curve identifiers that could not be restored.
"""

missing: List[str] = []
for ann in annotations:
key = ann.get("path")
point = ann.get("point")
if key is None or point is None:
continue
curve = self._curves.get(str(key))
if curve is None:
missing.append(str(key))
continue
try:
pt_tuple: Point = (float(point[0]), float(point[1]))
except (TypeError, ValueError, IndexError):
missing.append(str(key))
continue
self._select(pt_tuple, curve)
return missing

def _curve_key(self, curve: pg.PlotDataItem) -> str | None:
Copy link

Copilot AI Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return type annotation uses the newer union syntax str | None which may not be compatible with older Python versions. Consider using Optional[str] for broader compatibility.

Copilot uses AI. Check for mistakes.
for key, item in self._curves.items():
if item is curve:
return key
return None

def apply_curve_metadata(self, metadata: Dict[str, Dict[str, Any]]) -> None:
for key, info in metadata.items():
curve = self._curves.get(key)
if curve is None:
continue

label = info.get("label")
if label is not None:
label_text = str(label)
if hasattr(curve, "setName"):
curve.setName(label_text)
else:
curve.opts["name"] = label_text
legend = getattr(self._plot.plotItem, "legend", None)
if legend is not None:
if hasattr(legend, "itemChanged"):
legend.itemChanged(curve)
else: # compatibility fallback for older pyqtgraph releases
try:
legend.removeItem(curve)
except Exception:
pass
legend.addItem(curve, label_text)

pen_info = info.get("pen", {})
color = pen_info.get("color")
width = pen_info.get("width")
if color is not None or width is not None:
kwargs: Dict[str, Any] = {}
if color is not None:
kwargs["color"] = color
if width is not None:
try:
kwargs["width"] = float(width)
except (TypeError, ValueError):
pass
if kwargs:
curve.setPen(pg.mkPen(**kwargs))
Loading