Skip to content

Commit f8cbf98

Browse files
Fix possible LCM URL collision (#2077)
1 parent 7ae197b commit f8cbf98

2 files changed

Lines changed: 25 additions & 33 deletions

File tree

dimos/protocol/pubsub/impl/test_lcmpubsub.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from collections.abc import Generator
15+
from collections.abc import Iterator
1616
import time
1717
from typing import Any
1818

@@ -29,32 +29,28 @@
2929
)
3030
from dimos.utils.testing.collector import CallbackCollector
3131

32-
# Isolated multicast group so stale messages from other tests
33-
# (which use the default 239.255.76.67:7667) don't leak in.
34-
_ISOLATED_LCM_URL = "udpm://239.255.76.98:7698?ttl=0"
35-
3632

3733
@pytest.fixture
38-
def lcm_pub_sub_base() -> Generator[LCMPubSubBase, None, None]:
39-
lcm = LCMPubSubBase(url=_ISOLATED_LCM_URL)
34+
def lcm_pub_sub_base(lcm_url: str) -> Iterator[LCMPubSubBase]:
35+
lcm = LCMPubSubBase(url=lcm_url)
4036
lcm.start()
4137
time.sleep(0.05) # let the handler thread enter the LCM loop
4238
yield lcm
4339
lcm.stop()
4440

4541

4642
@pytest.fixture
47-
def pickle_lcm() -> Generator[PickleLCM, None, None]:
48-
lcm = PickleLCM(url=_ISOLATED_LCM_URL)
43+
def pickle_lcm(lcm_url: str) -> Iterator[PickleLCM]:
44+
lcm = PickleLCM(url=lcm_url)
4945
lcm.start()
5046
time.sleep(0.05) # let the handler thread enter the LCM loop
5147
yield lcm
5248
lcm.stop()
5349

5450

5551
@pytest.fixture
56-
def lcm() -> Generator[LCM, None, None]:
57-
lcm = LCM(url=_ISOLATED_LCM_URL)
52+
def lcm(lcm_url: str) -> Iterator[LCM]:
53+
lcm = LCM(url=lcm_url)
5854
lcm.start()
5955
time.sleep(0.05) # let the handler thread enter the LCM loop
6056
yield lcm

dimos/protocol/pubsub/test_pattern_sub.py

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
"""Grid tests for subscribe_all pattern subscriptions."""
1717

18-
from collections.abc import Callable, Generator
18+
from collections.abc import Callable, Iterator
1919
from contextlib import AbstractContextManager, contextmanager
2020
from dataclasses import dataclass, field
2121
import re
@@ -44,22 +44,18 @@ class Case(Generic[TopicT, MsgT]):
4444
"""Test case for grid testing pubsub implementations."""
4545

4646
name: str
47-
pubsub_context: Callable[[], AbstractContextManager[PubSubPair[TopicT, MsgT]]]
47+
pubsub_context: Callable[[str], AbstractContextManager[PubSubPair[TopicT, MsgT]]]
4848
topic_values: list[tuple[TopicT, MsgT]]
4949
tags: set[str] = field(default_factory=set)
5050
# Pattern tests: (pattern_topic, {indices of topic_values that should match})
5151
glob_patterns: list[tuple[TopicT, set[int]]] = field(default_factory=list)
5252
regex_patterns: list[tuple[TopicT, set[int]]] = field(default_factory=list)
5353

5454

55-
# Use an isolated multicast group to avoid cross-test LCM contamination.
56-
_ISOLATED_LCM_URL = "udpm://239.255.76.99:7699?ttl=0"
57-
58-
5955
@contextmanager
60-
def lcm_typed_context() -> Generator[tuple[LCM, LCM], None, None]:
61-
pub = LCM(url=_ISOLATED_LCM_URL)
62-
sub = LCM(url=_ISOLATED_LCM_URL)
56+
def lcm_typed_context(url: str) -> Iterator[tuple[LCM, LCM]]:
57+
pub = LCM(url=url)
58+
sub = LCM(url=url)
6359
pub.start()
6460
sub.start()
6561
try:
@@ -70,9 +66,9 @@ def lcm_typed_context() -> Generator[tuple[LCM, LCM], None, None]:
7066

7167

7268
@contextmanager
73-
def lcm_bytes_context() -> Generator[tuple[LCMPubSubBase, LCMPubSubBase], None, None]:
74-
pub = LCMPubSubBase(url=_ISOLATED_LCM_URL)
75-
sub = LCMPubSubBase(url=_ISOLATED_LCM_URL)
69+
def lcm_bytes_context(url: str) -> Iterator[tuple[LCMPubSubBase, LCMPubSubBase]]:
70+
pub = LCMPubSubBase(url=url)
71+
sub = LCMPubSubBase(url=url)
7672
pub.start()
7773
sub.start()
7874
try:
@@ -142,11 +138,11 @@ def _topic_matches_prefix(topic: Any, prefix: str = "/") -> bool:
142138

143139

144140
@pytest.mark.parametrize("tc", all_cases, ids=lambda c: c.name)
145-
def test_subscribe_all_receives_all_topics(tc: Case[Any, Any]) -> None:
141+
def test_subscribe_all_receives_all_topics(tc: Case[Any, Any], lcm_url: str) -> None:
146142
"""Test that subscribe_all receives messages from all topics."""
147143
collector = CallbackCollector(len(tc.topic_values))
148144

149-
with tc.pubsub_context() as (pub, sub):
145+
with tc.pubsub_context(lcm_url) as (pub, sub):
150146
sub.subscribe_all(collector)
151147
time.sleep(0.01) # Allow subscription to register
152148

@@ -164,12 +160,12 @@ def test_subscribe_all_receives_all_topics(tc: Case[Any, Any]) -> None:
164160

165161

166162
@pytest.mark.parametrize("tc", all_cases, ids=lambda c: c.name)
167-
def test_subscribe_all_unsubscribe(tc: Case[Any, Any]) -> None:
163+
def test_subscribe_all_unsubscribe(tc: Case[Any, Any], lcm_url: str) -> None:
168164
"""Test that unsubscribe stops receiving messages."""
169165
collector = CallbackCollector(1)
170166
topic, value = tc.topic_values[0]
171167

172-
with tc.pubsub_context() as (pub, sub):
168+
with tc.pubsub_context(lcm_url) as (pub, sub):
173169
unsub = sub.subscribe_all(collector)
174170
time.sleep(0.01) # Allow subscription to register
175171

@@ -185,14 +181,14 @@ def test_subscribe_all_unsubscribe(tc: Case[Any, Any]) -> None:
185181

186182

187183
@pytest.mark.parametrize("tc", all_cases, ids=lambda c: c.name)
188-
def test_subscribe_all_with_regular_subscribe(tc: Case[Any, Any]) -> None:
184+
def test_subscribe_all_with_regular_subscribe(tc: Case[Any, Any], lcm_url: str) -> None:
189185
"""Test that subscribe_all coexists with regular subscriptions."""
190186
all_collector = CallbackCollector(2)
191187
specific_received: list[tuple[Any, Any]] = []
192188
topic1, value1 = tc.topic_values[0]
193189
topic2, value2 = tc.topic_values[1]
194190

195-
with tc.pubsub_context() as (pub, sub):
191+
with tc.pubsub_context(lcm_url) as (pub, sub):
196192
sub.subscribe_all(
197193
lambda msg, topic: all_collector(msg, topic) if _topic_matches_prefix(topic) else None
198194
)
@@ -212,12 +208,12 @@ def test_subscribe_all_with_regular_subscribe(tc: Case[Any, Any]) -> None:
212208

213209

214210
@pytest.mark.parametrize("tc", glob_cases, ids=lambda c: c.name)
215-
def test_subscribe_glob(tc: Case[Any, Any]) -> None:
211+
def test_subscribe_glob(tc: Case[Any, Any], lcm_url: str) -> None:
216212
"""Test that glob pattern subscriptions receive only matching topics."""
217213
for pattern_topic, expected_indices in tc.glob_patterns:
218214
collector = CallbackCollector(len(expected_indices))
219215

220-
with tc.pubsub_context() as (pub, sub):
216+
with tc.pubsub_context(lcm_url) as (pub, sub):
221217
sub.subscribe(pattern_topic, collector)
222218
time.sleep(0.01) # Allow subscription to register
223219

@@ -238,12 +234,12 @@ def test_subscribe_glob(tc: Case[Any, Any]) -> None:
238234

239235

240236
@pytest.mark.parametrize("tc", regex_cases, ids=lambda c: c.name)
241-
def test_subscribe_regex(tc: Case[Any, Any]) -> None:
237+
def test_subscribe_regex(tc: Case[Any, Any], lcm_url: str) -> None:
242238
"""Test that regex pattern subscriptions receive only matching topics."""
243239
for pattern_topic, expected_indices in tc.regex_patterns:
244240
collector = CallbackCollector(len(expected_indices))
245241

246-
with tc.pubsub_context() as (pub, sub):
242+
with tc.pubsub_context(lcm_url) as (pub, sub):
247243
sub.subscribe(pattern_topic, collector)
248244
time.sleep(0.01) # Allow subscription to register
249245

0 commit comments

Comments
 (0)