Skip to content

Commit 89ada29

Browse files
authored
fix(tests): stop LCM thread leak in cross-wall planning tests (#2068)
1 parent d2e695b commit 89ada29

10 files changed

Lines changed: 58 additions & 54 deletions

File tree

bin/pytest-slow

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@
33
set -euo pipefail
44

55
. .venv/bin/activate
6-
exec pytest --numprocesses=auto "$@" -m 'not (tool or mujoco)' dimos
6+
exec pytest "$@" -m 'not (tool or mujoco)' dimos

dimos/core/coordination/test_worker.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ class CapturingLogger:
190190
def log_stats(self, coordinator, workers):
191191
captured.append(workers)
192192

193+
def stop(self):
194+
pass
195+
193196
monitor = StatsMonitor(manager, resource_logger=CapturingLogger(), interval=0.5)
194197
monitor.start()
195198
import time

dimos/core/resource_monitor/logger.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,15 @@
2828
class ResourceLogger(Protocol):
2929
def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> None: ...
3030

31+
def stop(self) -> None: ...
32+
3133

3234
class StructlogResourceLogger:
3335
"""Default implementation — logs resource stats via structlog info."""
3436

37+
def stop(self) -> None:
38+
pass
39+
3540
def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> None:
3641
logger.info(
3742
"coordinator",
@@ -65,6 +70,9 @@ def __init__(self, topic: str = "/dimos/resource_stats") -> None:
6570

6671
self._transport: pLCMTransport[dict[str, Any]] = pLCMTransport(topic)
6772

73+
def stop(self) -> None:
74+
self._transport.stop()
75+
6876
def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> None:
6977
self._transport.broadcast(
7078
None,

dimos/core/resource_monitor/monitor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ def stop(self) -> None:
9595
if self._thread is not None:
9696
self._thread.join(timeout=5.0)
9797
self._thread = None
98+
self._logger.stop()
9899

99100
def _loop(self) -> None:
100101
while not self._stop.wait(self._interval):

dimos/navigation/nav_stack/tests/conftest.py

Lines changed: 35 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,13 @@
2222

2323
from __future__ import annotations
2424

25+
import asyncio
2526
import math
2627
from pathlib import Path
27-
import threading
2828
import time
2929

3030
import lcm as lcmlib
3131

32-
from dimos.constants import DEFAULT_THREAD_JOIN_TIMEOUT
3332
from dimos.core.coordination.blueprints import Blueprint
3433
from dimos.core.coordination.module_coordinator import ModuleCoordinator
3534
from dimos.msgs.geometry_msgs.PointStamped import PointStamped
@@ -76,13 +75,11 @@ def _clear_precomputed_paths() -> None:
7675
path.unlink(missing_ok=True)
7776

7877

79-
def run_cross_wall_test(blueprint: Blueprint, *, label: str, max_z: float | None = None) -> None:
80-
"""Build the coordinator, drive the cross-wall waypoint sequence, tear down."""
78+
async def _run_cross_wall_test(blueprint: Blueprint, *, label: str, max_z: float | None) -> None:
8179
_clear_precomputed_paths()
8280

8381
coordinator = ModuleCoordinator.build(blueprint)
8482

85-
lock = threading.Lock()
8683
odom_count = 0
8784
robot_x = 0.0
8885
robot_y = 0.0
@@ -94,52 +91,43 @@ def run_cross_wall_test(blueprint: Blueprint, *, label: str, max_z: float | None
9491
def _odom_handler(_channel: str, data: bytes) -> None:
9592
nonlocal odom_count, robot_x, robot_y, robot_z, max_z_seen
9693
msg = Odometry.lcm_decode(data)
97-
with lock:
98-
odom_count += 1
99-
robot_x = msg.x
100-
robot_y = msg.y
101-
robot_z = msg.pose.position.z
102-
if robot_z > max_z_seen:
103-
max_z_seen = robot_z
94+
odom_count += 1
95+
robot_x = msg.x
96+
robot_y = msg.y
97+
robot_z = msg.pose.position.z
98+
if robot_z > max_z_seen:
99+
max_z_seen = robot_z
104100

105101
subscription = lcm.subscribe(ODOM_TOPIC, _odom_handler)
106102

107-
lcm_stop = threading.Event()
103+
loop = asyncio.get_running_loop()
104+
lcm_fd = lcm.fileno()
108105

109-
def _lcm_loop() -> None:
110-
while not lcm_stop.is_set():
111-
try:
112-
lcm.handle_timeout(100)
113-
except Exception:
114-
# Don't spin forever waiting on odom that will never arrive.
115-
logger.exception("LCM handle_timeout failed; stopping loop")
116-
lcm_stop.set()
117-
return
106+
def _on_lcm_readable() -> None:
107+
try:
108+
lcm.handle()
109+
except Exception:
110+
logger.exception("LCM handle failed; removing reader to stop further polling")
111+
loop.remove_reader(lcm_fd)
118112

119-
lcm_thread = threading.Thread(target=_lcm_loop, daemon=True)
120-
lcm_thread.start()
113+
loop.add_reader(lcm_fd, _on_lcm_readable)
121114

122115
try:
123116
logger.info(f"[{label}] Blueprint started, waiting for odom…")
124117

125118
deadline = time.monotonic() + ODOM_WAIT_SEC
126-
while time.monotonic() < deadline:
127-
with lock:
128-
if odom_count > 0:
129-
break
130-
time.sleep(0.5)
119+
while time.monotonic() < deadline and odom_count == 0:
120+
await asyncio.sleep(0.5)
131121

132-
with lock:
133-
assert odom_count > 0, f"No odometry received after {ODOM_WAIT_SEC}s — sim not running?"
134-
initial_x, initial_y = robot_x, robot_y
122+
assert odom_count > 0, f"No odometry received after {ODOM_WAIT_SEC}s — sim not running?"
123+
initial_x, initial_y = robot_x, robot_y
135124

136125
logger.info(f"[{label}] Odom online. Robot at ({initial_x:.2f}, {initial_y:.2f})")
137126
logger.info(f"[{label}] Warming up for {WARMUP_SEC}s…")
138-
time.sleep(WARMUP_SEC)
127+
await asyncio.sleep(WARMUP_SEC)
139128

140129
for name, goal_x, goal_y, goal_z, timeout_sec, threshold in CROSS_WALL_WAYPOINTS:
141-
with lock:
142-
start_x, start_y = robot_x, robot_y
130+
start_x, start_y = robot_x, robot_y
143131

144132
logger.info(
145133
f"[{label}] === {name}: goal ({goal_x}, {goal_y}) | "
@@ -156,10 +144,9 @@ def _lcm_loop() -> None:
156144
current_x, current_y = start_x, start_y
157145
distance = _distance(current_x, current_y, goal_x, goal_y)
158146
while True:
159-
with lock:
160-
current_x, current_y = robot_x, robot_y
161-
current_z = robot_z
162-
current_max_z = max_z_seen
147+
current_x, current_y = robot_x, robot_y
148+
current_z = robot_z
149+
current_max_z = max_z_seen
163150

164151
if max_z is not None:
165152
assert current_z <= max_z, (
@@ -176,24 +163,25 @@ def _lcm_loop() -> None:
176163
break
177164
if elapsed >= timeout_sec:
178165
break
179-
time.sleep(GOAL_POLL_INTERVAL_SEC)
166+
await asyncio.sleep(GOAL_POLL_INTERVAL_SEC)
180167

181168
assert reached, (
182169
f"{name}: robot did not reach ({goal_x}, {goal_y}) within {timeout_sec}s. "
183170
f"Final pos=({current_x:.2f}, {current_y:.2f}), dist={distance:.2f}m"
184171
)
185172

186173
if max_z is not None:
187-
with lock:
188-
final_max_z = max_z_seen
189-
assert final_max_z <= max_z, (
190-
f"Robot z peaked at {final_max_z:.2f}m during the run "
174+
assert max_z_seen <= max_z, (
175+
f"Robot z peaked at {max_z_seen:.2f}m during the run "
191176
f"(limit {max_z}m) — went through the ceiling"
192177
)
193178

194179
finally:
195-
lcm_stop.set()
196-
lcm_thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT)
197-
assert not lcm_thread.is_alive(), "LCM loop thread didn't exit cleanly"
180+
loop.remove_reader(lcm_fd)
198181
lcm.unsubscribe(subscription)
199182
coordinator.stop()
183+
184+
185+
def run_cross_wall_test(blueprint: Blueprint, *, label: str, max_z: float | None = None) -> None:
186+
"""Build the coordinator, drive the cross-wall waypoint sequence, tear down."""
187+
asyncio.run(_run_cross_wall_test(blueprint, label=label, max_z=max_z))

dimos/navigation/nav_stack/tests/test_cross_wall_planning_far.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
unitree_g1_nav_sim,
2727
)
2828

29-
pytestmark = [pytest.mark.skipif_in_ci, pytest.mark.skipif_macos]
29+
pytestmark = [pytest.mark.self_hosted, pytest.mark.skipif_in_ci, pytest.mark.skipif_macos]
3030

3131

3232
class TestCrossWallPlanning:
@@ -36,5 +36,5 @@ def test_cross_wall_sequence(self) -> None:
3636
blueprint = autoconnect(
3737
unitree_g1_nav_sim,
3838
create_nav_stack(**{**nav_config, "planner": "far"}),
39-
).global_config(dtop=True)
39+
).global_config()
4040
run_cross_wall_test(blueprint, label="far")

dimos/navigation/nav_stack/tests/test_cross_wall_planning_simple.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
unitree_g1_nav_sim,
2727
)
2828

29-
pytestmark = [pytest.mark.skipif_in_ci, pytest.mark.skipif_macos]
29+
pytestmark = [pytest.mark.self_hosted, pytest.mark.skipif_in_ci, pytest.mark.skipif_macos]
3030

3131

3232
class TestCrossWallPlanningSimple:
@@ -36,5 +36,5 @@ def test_cross_wall_sequence_simple(self) -> None:
3636
blueprint = autoconnect(
3737
unitree_g1_nav_sim,
3838
create_nav_stack(**{**nav_config, "planner": "simple"}),
39-
).global_config(dtop=True)
39+
).global_config()
4040
run_cross_wall_test(blueprint, label="simple")

dimos/simulation/unity/test_unity_sim.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
_is_linux_x86 = platform.system() == "Linux" and platform.machine() in ("x86_64", "AMD64")
5555
_has_display = bool(os.environ.get("DISPLAY"))
5656

57+
pytestmark = pytest.mark.self_hosted
58+
5759

5860
class _MockTransport:
5961
def __init__(self):

dimos/visualization/rerun/bridge.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,9 @@ def _default_blueprint() -> Blueprint:
166166
class Config(ModuleConfig):
167167
pubsubs: list[SubscribeAllCapable[Any, Any]] = field(default_factory=lambda: [LCM()])
168168

169-
visual_override: dict[Glob | str, Callable[[Any], Archetype]] = field(default_factory=dict)
169+
visual_override: dict[Glob | str, Callable[[Any], Archetype] | None] = field(
170+
default_factory=dict
171+
)
170172
static: dict[str, Callable[[Any], Archetype]] = field(default_factory=dict)
171173
max_hz: dict[str, float] = field(default_factory=dict)
172174

docs/development/testing.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ The default `addopts` in `pyproject.toml` includes a `-m` filter that excludes `
5555
./bin/pytest-slow
5656
```
5757

58-
(Shortcut for `pytest --numprocesses=auto -m 'not (tool or mujoco)' dimos` — runs the default suite *and* self-hosted tests, but not `tool` or `mujoco`.)
58+
(Shortcut for `pytest -m 'not (tool or mujoco)' dimos` — runs the default suite *and* self-hosted tests, but not `tool` or `mujoco`.)
5959

6060
When writing or debugging a specific self-hosted test, override `-m` yourself to run it:
6161

0 commit comments

Comments
 (0)