Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 70 additions & 76 deletions selfdrive/car/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import os
import pytest
import random
import unittest # noqa: TID251
from collections import defaultdict, Counter
import hypothesis.strategies as st
from hypothesis import Phase, given, settings
Expand Down Expand Up @@ -60,7 +59,7 @@ def get_test_cases() -> list[tuple[str, CarTestRoute | None]]:

@pytest.mark.slow
@pytest.mark.shared_download_cache
class TestCarModelBase(unittest.TestCase):
class TestCarModelBase:
platform: Platform | None = None
test_route: CarTestRoute | None = None

Expand All @@ -69,13 +68,12 @@ class TestCarModelBase(unittest.TestCase):
elm_frame: int | None
car_safety_mode_frame: int | None

@classmethod
def get_testing_data_from_logreader(cls, lr):
def get_testing_data_from_logreader(self, lr):
car_fw = []
can_msgs = []
cls.elm_frame = None
cls.car_safety_mode_frame = None
cls.fingerprint = gen_empty_fingerprint()
self.elm_frame = None
self.car_safety_mode_frame = None
self.fingerprint = gen_empty_fingerprint()
alpha_long = False
for msg in lr:
if msg.which() == "can":
Expand All @@ -84,82 +82,80 @@ def get_testing_data_from_logreader(cls, lr):
if len(can_msgs) <= FRAME_FINGERPRINT:
for m in msg.can:
if m.src < 64:
cls.fingerprint[m.src][m.address] = len(m.dat)
self.fingerprint[m.src][m.address] = len(m.dat)

elif msg.which() == "carParams":
car_fw = msg.carParams.carFw
if msg.carParams.openpilotLongitudinalControl:
alpha_long = True
if cls.platform is None:
if self.platform is None:
live_fingerprint = msg.carParams.carFingerprint
cls.platform = MIGRATION.get(live_fingerprint, live_fingerprint)
self.platform = MIGRATION.get(live_fingerprint, live_fingerprint)

# Log which can frame the panda safety mode left ELM327, for CAN validity checks
elif msg.which() == 'pandaStates':
for ps in msg.pandaStates:
if cls.elm_frame is None and ps.safetyModel != SafetyModel.elm327:
cls.elm_frame = len(can_msgs)
if cls.car_safety_mode_frame is None and ps.safetyModel not in \
if self.elm_frame is None and ps.safetyModel != SafetyModel.elm327:
self.elm_frame = len(can_msgs)
if self.car_safety_mode_frame is None and ps.safetyModel not in \
(SafetyModel.elm327, SafetyModel.noOutput):
cls.car_safety_mode_frame = len(can_msgs)
self.car_safety_mode_frame = len(can_msgs)

elif msg.which() == 'pandaStateDEPRECATED':
if cls.elm_frame is None and msg.pandaStateDEPRECATED.safetyModel != SafetyModel.elm327:
cls.elm_frame = len(can_msgs)
if cls.car_safety_mode_frame is None and msg.pandaStateDEPRECATED.safetyModel not in \
if self.elm_frame is None and msg.pandaStateDEPRECATED.safetyModel != SafetyModel.elm327:
self.elm_frame = len(can_msgs)
if self.car_safety_mode_frame is None and msg.pandaStateDEPRECATED.safetyModel not in \
(SafetyModel.elm327, SafetyModel.noOutput):
cls.car_safety_mode_frame = len(can_msgs)
self.car_safety_mode_frame = len(can_msgs)

assert len(can_msgs) > int(50 / DT_CTRL), "no can data found"
return car_fw, can_msgs, alpha_long

@classmethod
def get_testing_data(cls):
def get_testing_data(self):
test_segs = (2, 1, 0)
if cls.test_route.segment is not None:
test_segs = (cls.test_route.segment,)
if self.test_route.segment is not None:
test_segs = (self.test_route.segment,)

for seg in test_segs:
segment_range = f"{cls.test_route.route}/{seg}"
segment_range = f"{self.test_route.route}/{seg}"

try:
sources = [internal_source] if len(INTERNAL_SEG_LIST) else [openpilotci_source, comma_api_source]
lr = LogReader(segment_range, sources=sources, sort_by_time=True)
return cls.get_testing_data_from_logreader(lr)
return self.get_testing_data_from_logreader(lr)
except (LogsUnavailable, AssertionError):
pass

raise Exception(f"Route: {repr(cls.test_route.route)} with segments: {test_segs} not found or no CAN msgs found. Is it uploaded and public?")
raise Exception(f"Route: {repr(self.test_route.route)} with segments: {test_segs} not found or no CAN msgs found. Is it uploaded and public?")

@pytest.fixture(autouse=True, scope="class")
def setup_class(self):
if self.__class__.__name__ == 'TestCarModel' or self.__class__.__name__.endswith('Base'):
pytest.skip("Base class or TestCarModel template")

@classmethod
def setUpClass(cls):
if cls.__name__ == 'TestCarModel' or cls.__name__.endswith('Base'):
raise unittest.SkipTest
if self.test_route is None:
if self.platform in non_tested_cars:
print(f"Skipping tests for {self.platform}: missing route")
pytest.skip(f"Skipping tests for {self.platform}: missing route")
raise Exception(f"missing test route for {self.platform}")

if cls.test_route is None:
if cls.platform in non_tested_cars:
print(f"Skipping tests for {cls.platform}: missing route")
raise unittest.SkipTest
raise Exception(f"missing test route for {cls.platform}")

car_fw, cls.can_msgs, alpha_long = cls.get_testing_data()
car_fw, self.can_msgs, alpha_long = self.get_testing_data()

# if relay is expected to be open in the route
cls.openpilot_enabled = cls.car_safety_mode_frame is not None
self.openpilot_enabled = self.car_safety_mode_frame is not None

cls.CarInterface = interfaces[cls.platform]
cls.CP = cls.CarInterface.get_params(cls.platform, cls.fingerprint, car_fw, alpha_long, False, docs=False)
assert cls.CP
assert cls.CP.carFingerprint == cls.platform
self.CarInterface = interfaces[self.platform]
self.CP = self.CarInterface.get_params(self.platform, self.fingerprint, car_fw, alpha_long, False, docs=False)
assert self.CP
assert self.CP.carFingerprint == self.platform

os.environ["COMMA_CACHE"] = DEFAULT_DOWNLOAD_CACHE_ROOT
yield
if hasattr(self, 'can_msgs'):
del self.can_msgs

@classmethod
def tearDownClass(cls):
del cls.can_msgs

def setUp(self):
@pytest.fixture(autouse=True)
def setup_method(self):
self.CI = self.CarInterface(self.CP.copy())
assert self.CI

Expand All @@ -168,22 +164,22 @@ def setUp(self):

cfg = self.CP.safetyConfigs[-1]
set_status = self.safety.set_safety_hooks(cfg.safetyModel.raw, cfg.safetyParam)
self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}")
assert 0 == set_status, f"failed to set safetyModel {cfg}"
self.safety.init_tests()

def test_car_params(self):
if self.CP.dashcamOnly:
self.skipTest("no need to check carParams for dashcamOnly")
pytest.skip("no need to check carParams for dashcamOnly")

# make sure car params are within a valid range
self.assertGreater(self.CP.mass, 1)
assert self.CP.mass > 1

if self.CP.steerControlType != SteerControlType.angle:
tuning = self.CP.lateralTuning.which()
if tuning == 'pid':
self.assertTrue(len(self.CP.lateralTuning.pid.kpV))
assert len(self.CP.lateralTuning.pid.kpV)
elif tuning == 'torque':
self.assertTrue(self.CP.lateralTuning.torque.latAccelFactor > 0)
assert self.CP.lateralTuning.torque.latAccelFactor > 0
else:
raise Exception("unknown tuning")

Expand All @@ -200,7 +196,7 @@ def test_car_interface(self):
if i > 250:
can_invalid_cnt += not CS.canValid

self.assertEqual(can_invalid_cnt, 0)
assert can_invalid_cnt == 0

def test_radar_interface(self):
RI = self.CarInterface.RadarInterface(self.CP)
Expand All @@ -213,11 +209,11 @@ def test_radar_interface(self):
rr: structs.RadarData | None = RI.update(msg)
if rr is not None and i > 50:
error_cnt += rr.errors.canError
self.assertEqual(error_cnt, 0)
assert error_cnt == 0

def test_panda_safety_rx_checks(self):
if self.CP.dashcamOnly:
self.skipTest("no need to check panda safety for dashcamOnly")
pytest.skip("no need to check panda safety for dashcamOnly")

start_ts = self.can_msgs[0][0]

Expand All @@ -239,29 +235,29 @@ def test_panda_safety_rx_checks(self):
# ensure all msgs defined in the addr checks are valid
self.safety.safety_tick_current_safety_config()
if t > 1e6:
self.assertTrue(self.safety.safety_config_valid())
assert self.safety.safety_config_valid()

# Don't check relay malfunction on disabled routes (relay closed),
# or before fingerprinting is done (elm327 and noOutput)
if self.openpilot_enabled and t / 1e4 > self.car_safety_mode_frame:
self.assertFalse(self.safety.get_relay_malfunction())
assert not self.safety.get_relay_malfunction()
else:
self.safety.set_relay_malfunction(False)

self.assertFalse(len(failed_addrs), f"panda safety RX check failed: {failed_addrs}")
assert not len(failed_addrs), f"panda safety RX check failed: {failed_addrs}"

# ensure RX checks go invalid after small time with no traffic
self.safety.set_timer(int(t + (2*1e6)))
self.safety.safety_tick_current_safety_config()
self.assertFalse(self.safety.safety_config_valid())
assert not self.safety.safety_config_valid()

def test_panda_safety_tx_cases(self, data=None):
"""Asserts we can tx common messages"""
if self.CP.dashcamOnly:
self.skipTest("no need to check panda safety for dashcamOnly")
pytest.skip("no need to check panda safety for dashcamOnly")

if self.CP.notCar:
self.skipTest("Skipping test for notCar")
pytest.skip("Skipping test for notCar")

def test_car_controller(car_control):
now_nanos = 0
Expand All @@ -275,10 +271,10 @@ def test_car_controller(car_control):
msgs_sent += len(sendcan)
for addr, dat, bus in sendcan:
to_send = libsafety_py.make_CANPacket(addr, bus % 4, dat)
self.assertTrue(self.safety.safety_tx_hook(to_send), (addr, dat, bus))
assert self.safety.safety_tx_hook(to_send), (addr, dat, bus)

# Make sure we attempted to send messages
self.assertGreater(msgs_sent, 50)
assert msgs_sent > 50

# Make sure we can send all messages while inactive
CC = structs.CarControl()
Expand Down Expand Up @@ -306,7 +302,7 @@ def test_panda_safety_carstate_fuzzy(self, data):
"""

if self.CP.dashcamOnly:
self.skipTest("no need to check panda safety for dashcamOnly")
pytest.skip("no need to check panda safety for dashcamOnly")

valid_addrs = [(addr, bus, size) for bus, addrs in self.fingerprint.items() for addr, size in addrs.items()]
address, bus, size = data.draw(st.sampled_from(valid_addrs))
Expand Down Expand Up @@ -339,7 +335,7 @@ def test_panda_safety_carstate_fuzzy(self, data):
continue

if self.safety.get_gas_pressed_prev() != prev_panda_gas:
self.assertEqual(CS.gasPressed, self.safety.get_gas_pressed_prev())
assert CS.gasPressed == self.safety.get_gas_pressed_prev()

if self.safety.get_brake_pressed_prev() != prev_panda_brake:
# TODO: remove this exception once this mismatch is resolved
Expand All @@ -348,16 +344,16 @@ def test_panda_safety_carstate_fuzzy(self, data):
if self.CP.carFingerprint in (HONDA.HONDA_PILOT, HONDA.HONDA_RIDGELINE) and CS.brake > 0.05:
brake_pressed = False

self.assertEqual(brake_pressed, self.safety.get_brake_pressed_prev())
assert brake_pressed == self.safety.get_brake_pressed_prev()

if self.safety.get_regen_braking_prev() != prev_panda_regen_braking:
self.assertEqual(CS.regenBraking, self.safety.get_regen_braking_prev())
assert CS.regenBraking == self.safety.get_regen_braking_prev()

if self.safety.get_steering_disengage_prev() != prev_panda_steering_disengage:
self.assertEqual(CS.steeringDisengage, self.safety.get_steering_disengage_prev())
assert CS.steeringDisengage == self.safety.get_steering_disengage_prev()

if self.safety.get_vehicle_moving() != prev_panda_vehicle_moving and not self.CP.notCar:
self.assertEqual(not CS.standstill, self.safety.get_vehicle_moving())
assert not CS.standstill == self.safety.get_vehicle_moving()

# check vehicle speed if angle control car or available
if self.safety.get_vehicle_speed_min() > 0 or self.safety.get_vehicle_speed_max() > 0:
Expand All @@ -366,23 +362,23 @@ def test_panda_safety_carstate_fuzzy(self, data):
if vehicle_speed_seen and (self.safety.get_vehicle_speed_min() != prev_panda_vehicle_speed_min or
self.safety.get_vehicle_speed_max() != prev_panda_vehicle_speed_max):
v_ego_raw = CS.vEgoRaw / self.CP.wheelSpeedFactor
self.assertFalse(v_ego_raw > (self.safety.get_vehicle_speed_max() + 1e-3) or
assert not (v_ego_raw > (self.safety.get_vehicle_speed_max() + 1e-3) or
v_ego_raw < (self.safety.get_vehicle_speed_min() - 1e-3))

if not (self.CP.brand == "honda" and not (self.CP.flags & HondaFlags.BOSCH)):
if self.safety.get_cruise_engaged_prev() != prev_panda_cruise_engaged:
self.assertEqual(CS.cruiseState.enabled, self.safety.get_cruise_engaged_prev())
assert CS.cruiseState.enabled == self.safety.get_cruise_engaged_prev()

if self.CP.brand == "honda":
if self.safety.get_acc_main_on() != prev_panda_acc_main_on:
self.assertEqual(CS.cruiseState.available, self.safety.get_acc_main_on())
assert CS.cruiseState.available == self.safety.get_acc_main_on()

def test_panda_safety_carstate(self):
"""
Assert that panda safety matches openpilot's carState
"""
if self.CP.dashcamOnly:
self.skipTest("no need to check panda safety for dashcamOnly")
pytest.skip("no need to check panda safety for dashcamOnly")

# warm up pass, as initial states may be different
for can in self.can_msgs[:300]:
Expand All @@ -400,7 +396,7 @@ def test_panda_safety_carstate(self):
for msg in filter(lambda m: m.src < 64, can[1]):
to_send = libsafety_py.make_CANPacket(msg.address, msg.src % 4, msg.dat)
ret = self.safety.safety_rx_hook(to_send)
self.assertEqual(1, ret, f"safety rx failed ({ret=}): {(msg.address, msg.src % 4)}")
assert 1 == ret, f"safety rx failed ({ret=}): {(msg.address, msg.src % 4)}"

# Skip first frame so CS_prev is properly initialized
if idx == 0:
Expand Down Expand Up @@ -462,7 +458,7 @@ def test_panda_safety_carstate(self):
CS_prev = CS

failed_checks = {k: v for k, v in checks.items() if v > 0}
self.assertFalse(len(failed_checks), f"panda safety doesn't agree with openpilot: {failed_checks}")
assert not len(failed_checks), f"panda safety doesn't agree with openpilot: {failed_checks}"


@parameterized_class(('platform', 'test_route'), get_test_cases())
Expand All @@ -471,5 +467,3 @@ class TestCarModel(TestCarModelBase):
pass


if __name__ == "__main__":
unittest.main()
7 changes: 4 additions & 3 deletions selfdrive/ui/mici/layouts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ def _setup_callbacks(self):
device.add_interactive_timeout_callback(self._set_mode_for_started)

def _scroll_to(self, layout: Widget):
layout_x = int(layout.rect.x)
# Scroll to the absolute layout_x position (negated because scroll_panel uses negative offsets for content)
layout_x = -int(layout.rect.x)
self._scroller.scroll_to(layout_x, smooth=True)

def _render(self, _):
Expand All @@ -83,9 +84,9 @@ def _render(self, _):

if not self._setup:
if self._alerts_layout.active_alerts() > 0:
self._scroller.scroll_to(self._alerts_layout.rect.x)
self._scroller.scroll_to(-self._alerts_layout.rect.x)
else:
self._scroller.scroll_to(self._rect.width)
self._scroller.scroll_to(-self._rect.width)
self._setup = True

# Render
Expand Down
2 changes: 1 addition & 1 deletion selfdrive/ui/mici/widgets/dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def _on_option_selected(self, option: str):
y_pos = rect_center_y - (btn.rect.y + height / 2)
break

self._scroller.scroll_to(-y_pos)
self._scroller.scroll_to(y_pos)

def _selected_option_changed(self):
pass
Expand Down
13 changes: 4 additions & 9 deletions system/ui/widgets/scroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,12 @@ def __init__(self, items: list[Widget], horizontal: bool = True, snap_items: boo
def set_reset_scroll_at_show(self, scroll: bool):
self._reset_scroll_at_show = scroll

def scroll_to(self, pos: float, smooth: bool = False):
# already there
if abs(pos) < 1:
return

# FIXME: the padding correction doesn't seem correct
scroll_offset = self.scroll_panel.get_offset() - pos
def scroll_to(self, target_offset: float, smooth: bool = False):
"""Scrolls to an absolute offset."""
if smooth:
self._scrolling_to = scroll_offset
self._scrolling_to = float(target_offset)
else:
self.scroll_panel.set_offset(scroll_offset)
self.scroll_panel.set_offset(float(target_offset))

@property
def is_auto_scrolling(self) -> bool:
Expand Down
Loading