From 46c5c7410d77a862a375f1cc337529fd16b42f29 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 19 Aug 2025 12:43:10 +0000 Subject: [PATCH 1/4] Initial plan From 2900dd67ea092182678ff59b62fa09b4a80b9d1e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 19 Aug 2025 12:51:10 +0000 Subject: [PATCH 2/4] Implement ZeroMQ transport with unit tests Co-authored-by: gonzalocasas <933277+gonzalocasas@users.noreply.github.com> --- src/compas_eve/__init__.py | 9 ++ src/compas_eve/zeromq/__init__.py | 254 ++++++++++++++++++++++++++++++ tests/unit/test_zeromq.py | 146 +++++++++++++++++ 3 files changed, 409 insertions(+) create mode 100644 src/compas_eve/zeromq/__init__.py create mode 100644 tests/unit/test_zeromq.py diff --git a/src/compas_eve/__init__.py b/src/compas_eve/__init__.py index 8dadd667..20964e7e 100644 --- a/src/compas_eve/__init__.py +++ b/src/compas_eve/__init__.py @@ -20,6 +20,7 @@ EchoSubscriber Transport InMemoryTransport + ZeroMQTransport get_default_transport set_default_transport @@ -49,6 +50,11 @@ ) from .memory import InMemoryTransport +try: + from .zeromq import ZeroMQTransport +except ImportError: + ZeroMQTransport = None + HERE = os.path.dirname(__file__) HOME = os.path.abspath(os.path.join(HERE, "../../")) @@ -66,4 +72,7 @@ "set_default_transport", "InMemoryTransport", ] + +if ZeroMQTransport is not None: + __all__.append("ZeroMQTransport") __all_plugins__ = ["compas_eve.rhino.install"] diff --git a/src/compas_eve/zeromq/__init__.py b/src/compas_eve/zeromq/__init__.py new file mode 100644 index 00000000..57f6f8ab --- /dev/null +++ b/src/compas_eve/zeromq/__init__.py @@ -0,0 +1,254 @@ +""" +******************************************************************************** +compas_eve.zeromq +******************************************************************************** + +.. currentmodule:: compas_eve.zeromq + + +Classes +======= + +.. autosummary:: + :toctree: generated/ + :nosignatures: + + ZeroMQTransport + +""" + +from ..core import Transport +from ..event_emitter import EventEmitterMixin + +try: + import zmq +except ImportError: + zmq = None + +__all__ = ["ZeroMQTransport"] + + +class ZeroMQTransport(Transport, EventEmitterMixin): + """ZeroMQ transport allows sending and receiving messages using ZeroMQ pub/sub sockets. + + Parameters + ---------- + endpoint : str + Endpoint for the pub/sub communication, e.g. ``tcp://localhost:5555`` or ``inproc://test``. + Publishers will connect to this endpoint, subscribers will bind to it. + """ + + def __init__(self, endpoint, *args, **kwargs): + if zmq is None: + raise ImportError("pyzmq is required for ZeroMQ transport. Please install it with: pip install pyzmq") + + super(ZeroMQTransport, self).__init__(*args, **kwargs) + + self.endpoint = endpoint + self._is_connected = False + self._local_callbacks = {} + + # Create ZeroMQ context and sockets + self.context = zmq.Context() + self.pub_socket = self.context.socket(zmq.PUB) + self.sub_socket = self.context.socket(zmq.SUB) + + # Publisher connects to endpoint, subscriber binds to it + # This allows multiple publishers to connect to one subscriber endpoint + try: + self.sub_socket.bind(self.endpoint) + self.pub_socket.connect(self.endpoint) + except zmq.ZMQError as e: + # If bind fails, try the reverse (useful for tcp endpoints that might be in use) + try: + self.pub_socket.connect(self.endpoint) + self.sub_socket.connect(self.endpoint) + except zmq.ZMQError: + raise e + + # Set up polling for subscriber + self.poller = zmq.Poller() + self.poller.register(self.sub_socket, zmq.POLLIN) + + # Mark as connected (ZeroMQ doesn't have explicit connection state) + self._is_connected = True + + # Start polling thread for incoming messages + import threading + self._polling = True + self._poll_thread = threading.Thread(target=self._poll_messages) + self._poll_thread.daemon = True + self._poll_thread.start() + + # Emit ready event + self.emit("ready") + + def close(self): + """Close the ZeroMQ sockets and context.""" + self._polling = False + if hasattr(self, '_poll_thread'): + self._poll_thread.join(timeout=1) + + self.pub_socket.close() + self.sub_socket.close() + self.context.term() + + def _poll_messages(self): + """Poll for incoming messages in a separate thread.""" + while self._polling: + try: + # Poll with timeout to allow thread termination + socks = dict(self.poller.poll(100)) # 100ms timeout + if self.sub_socket in socks: + # Receive topic and message + topic_bytes = self.sub_socket.recv(zmq.NOBLOCK) + message_bytes = self.sub_socket.recv(zmq.NOBLOCK) + + topic_name = topic_bytes.decode('utf-8') + message_str = message_bytes.decode('utf-8') + + # Emit the message event + event_key = "event:{}".format(topic_name) + self.emit(event_key, message_str) + + except zmq.Again: + # No message available, continue polling + continue + except Exception as e: + # Emit error but continue polling + self.emit("error", e) + + def on_ready(self, callback): + """Allows to hook-up to the event triggered when the transport is ready. + + Parameters + ---------- + callback : function + Function to invoke when the connection is established. + """ + if self._is_connected: + callback() + else: + self.once("ready", callback) + + def publish(self, topic, message): + """Publish a message to a topic. + + Parameters + ---------- + topic : :class:`Topic` + Instance of the topic to publish to. + message : :class:`Message` + Instance of the message to publish. + """ + def _callback(**kwargs): + json_message = topic._message_to_json(message) + + # Send topic and message as separate frames + self.pub_socket.send_string(topic.name, zmq.SNDMORE) + self.pub_socket.send_string(json_message) + + self.on_ready(_callback) + + def subscribe(self, topic, callback): + """Subscribe to a topic. + + Every time a new message is received on the topic, the callback will be invoked. + + Parameters + ---------- + topic : :class:`Topic` + Instance of the topic to subscribe to. + callback : function + Callback to invoke whenever a new message arrives. The callback should + receive only one `msg` argument, e.g. ``lambda msg: print(msg)``. + + Returns + ------- + str + Returns an identifier of the subscription. + """ + event_key = "event:{}".format(topic.name) + subscribe_id = "{}:{}".format(event_key, id(callback)) + + def _local_callback(message_str): + msg = topic._message_from_json(message_str) + callback(msg) + + def _subscribe_callback(**kwargs): + # Subscribe to the topic on ZeroMQ socket + self.sub_socket.setsockopt_string(zmq.SUBSCRIBE, topic.name) + + # Register local callback for this topic + self.on(event_key, _local_callback) + + self._local_callbacks[subscribe_id] = _local_callback + + self.on_ready(_subscribe_callback) + + return subscribe_id + + def advertise(self, topic): + """Announce this code will publish messages to the specified topic. + + This call has no effect on this transport implementation. + + Parameters + ---------- + topic : :class:`Topic` + Instance of the topic to advertise. + + Returns + ------- + str + Advertising identifier. + """ + advertise_id = "advertise:{}:{}".format(topic.name, self.id_counter) + # ZeroMQ does not need explicit advertising + return advertise_id + + def unadvertise(self, topic): + """Announce that this code will stop publishing messages to the specified topic. + + This call has no effect on this transport implementation. + + Parameters + ---------- + topic : :class:`Topic` + Instance of the topic to stop publishing messages to. + """ + pass + + def unsubscribe_by_id(self, subscribe_id): + """Unsubscribe from the specified topic based on the subscription id. + + Parameters + ---------- + subscribe_id : str + Identifier of the subscription. + """ + ev_type, topic_name, _callback_id = subscribe_id.split(":") + event_key = "{}:{}".format(ev_type, topic_name) + + callback = self._local_callbacks[subscribe_id] + self.off(event_key, callback) + + # Unsubscribe from ZeroMQ socket + self.sub_socket.setsockopt_string(zmq.UNSUBSCRIBE, topic_name) + + del self._local_callbacks[subscribe_id] + + def unsubscribe(self, topic): + """Unsubscribe from the specified topic. + + Parameters + ---------- + topic : :class:`Topic` + Instance of the topic to unsubscribe from. + """ + # Unsubscribe from ZeroMQ socket + self.sub_socket.setsockopt_string(zmq.UNSUBSCRIBE, topic.name) + + # Remove all local listeners for this topic + event_key = "event:{}".format(topic.name) + self.remove_all_listeners(event_key) \ No newline at end of file diff --git a/tests/unit/test_zeromq.py b/tests/unit/test_zeromq.py new file mode 100644 index 00000000..a4d09c97 --- /dev/null +++ b/tests/unit/test_zeromq.py @@ -0,0 +1,146 @@ +from threading import Event + +from compas_eve import Message +from compas_eve import Publisher +from compas_eve import Subscriber +from compas_eve import Topic +from compas_eve import set_default_transport + +try: + from compas_eve import ZeroMQTransport + ZEROMQ_AVAILABLE = True +except ImportError: + ZEROMQ_AVAILABLE = False + +import pytest + + +@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") +def test_zeromq_import(): + """Test that ZeroMQ transport can be imported.""" + assert ZeroMQTransport is not None + + +@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") +def test_default_transport_publishing(): + set_default_transport(ZeroMQTransport("inproc://test1")) + event = Event() + topic = Topic("/messages_compas_eve_test/test_default_transport_publishing/", Message) + + Subscriber(topic, lambda m: event.set()).subscribe() + Publisher(topic).publish(Message(done=True)) + + received = event.wait(timeout=3) + assert received, "Message not received" + + +@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") +def test_pubsub(): + tx = ZeroMQTransport("inproc://test2") + event = Event() + topic = Topic("/messages_compas_eve_test/test_pubsub/", Message) + + Subscriber(topic, lambda m: event.set(), transport=tx).subscribe() + Publisher(topic, transport=tx).publish(Message(done=True)) + + received = event.wait(timeout=3) + assert received, "Message not received" + + tx.close() + + +@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") +def test_two_subs(): + tx = ZeroMQTransport("inproc://test3") + event1 = Event() + event2 = Event() + topic = Topic("/messages_compas_eve_test/test_two_subs/", Message) + + Subscriber(topic, lambda m: event1.set(), transport=tx).subscribe() + Subscriber(topic, lambda m: event2.set(), transport=tx).subscribe() + Publisher(topic, transport=tx).publish(Message(done=True)) + + received1 = event1.wait(timeout=2) + received2 = event2.wait(timeout=2) + assert received1, "Message 1 not received" + assert received2, "Message 2 not received" + + tx.close() + + +@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") +def test_unsub(): + tx = ZeroMQTransport("inproc://test4") + topic = Topic("/messages_compas_eve_test/test_unsub/", Message) + + result = dict(count=0, event=Event()) + + def callback(msg): + result["count"] += 1 + result["event"].set() + + pub = Publisher(topic, transport=tx) + sub = Subscriber(topic, callback, transport=tx) + + sub.subscribe() + pub.publish(Message(done=True)) + received = result["event"].wait(timeout=3) + assert received, "First message not received" + assert len(list(tx._local_callbacks.keys())) == 1, "Internal callback reference should have been kept" + + result["event"].clear() + sub.unsubscribe() + pub.publish(Message(done=True)) + + received = result["event"].wait(timeout=1) + assert received is False, "Second message received but it should have been unsubscribed" + assert result["count"] == 1, "Did not unsubscribe properly" + assert len(list(tx._local_callbacks.keys())) == 0, "Internal callback reference should have been released" + + tx.close() + + +@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") +def test_message_type_parsing(): + tx = ZeroMQTransport("inproc://test5") + event = Event() + topic = Topic("/messages_compas_eve_test/test_message_type_parsing/", Message) + + result = {} + + def callback(msg): + result["message"] = msg + event.set() + + Subscriber(topic, callback, transport=tx).subscribe() + Publisher(topic, transport=tx).publish(Message(name="Compas Eve", done=True)) + + received = event.wait(timeout=3) + assert received, "Message not received" + assert result["message"].name == "Compas Eve" + assert result["message"].done is True + + tx.close() + + +@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") +def test_dict_as_message(): + tx = ZeroMQTransport("inproc://test6") + event = Event() + topic = Topic("/messages_compas_eve_test/test_dict_as_message/", Message) + + result = {} + + def callback(msg): + result["message"] = msg + event.set() + + Subscriber(topic, callback, transport=tx).subscribe() + Publisher(topic, transport=tx).publish({"name": "Compas Eve", "done": True}) + + received = event.wait(timeout=3) + assert received, "Message not received" + assert result["message"].name == "Compas Eve" + assert result["message"].done is True + + tx.close() \ No newline at end of file From c632b7269ee7eac25820459b120f06d190a3ce18 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 19 Aug 2025 12:56:05 +0000 Subject: [PATCH 3/4] Complete ZeroMQ transport with integration tests and examples Co-authored-by: gonzalocasas <933277+gonzalocasas@users.noreply.github.com> --- setup.py | 4 +- src/compas_eve/zeromq/__init__.py | 25 ++--- tests/integration/test_zeromq.py | 160 ++++++++++++++++++++++++++++++ 3 files changed, 176 insertions(+), 13 deletions(-) create mode 100644 tests/integration/test_zeromq.py diff --git a/setup.py b/setup.py index 4d7cfeec..1c813174 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,9 @@ def read(*names, **kwargs): long_description = read("README.md") requirements = read("requirements.txt").split("\n") -optional_requirements = {} +optional_requirements = { + "zeromq": ["pyzmq>=19.0"], +} setup( name="compas_eve", diff --git a/src/compas_eve/zeromq/__init__.py b/src/compas_eve/zeromq/__init__.py index 57f6f8ab..34ecd1eb 100644 --- a/src/compas_eve/zeromq/__init__.py +++ b/src/compas_eve/zeromq/__init__.py @@ -35,16 +35,20 @@ class ZeroMQTransport(Transport, EventEmitterMixin): ---------- endpoint : str Endpoint for the pub/sub communication, e.g. ``tcp://localhost:5555`` or ``inproc://test``. - Publishers will connect to this endpoint, subscribers will bind to it. + bind_subscriber : bool, optional + If True, the subscriber socket will bind to the endpoint and publisher will connect. + If False, the publisher will bind to the endpoint and subscriber will connect. + Defaults to True for most use cases. """ - def __init__(self, endpoint, *args, **kwargs): + def __init__(self, endpoint, bind_subscriber=True, *args, **kwargs): if zmq is None: raise ImportError("pyzmq is required for ZeroMQ transport. Please install it with: pip install pyzmq") super(ZeroMQTransport, self).__init__(*args, **kwargs) self.endpoint = endpoint + self.bind_subscriber = bind_subscriber self._is_connected = False self._local_callbacks = {} @@ -53,18 +57,15 @@ def __init__(self, endpoint, *args, **kwargs): self.pub_socket = self.context.socket(zmq.PUB) self.sub_socket = self.context.socket(zmq.SUB) - # Publisher connects to endpoint, subscriber binds to it - # This allows multiple publishers to connect to one subscriber endpoint - try: + # Configure sockets based on bind_subscriber setting + if self.bind_subscriber: + # Subscriber binds, publisher connects - good for many publishers, few subscribers self.sub_socket.bind(self.endpoint) self.pub_socket.connect(self.endpoint) - except zmq.ZMQError as e: - # If bind fails, try the reverse (useful for tcp endpoints that might be in use) - try: - self.pub_socket.connect(self.endpoint) - self.sub_socket.connect(self.endpoint) - except zmq.ZMQError: - raise e + else: + # Publisher binds, subscriber connects - good for one publisher, many subscribers + self.pub_socket.bind(self.endpoint) + self.sub_socket.connect(self.endpoint) # Set up polling for subscriber self.poller = zmq.Poller() diff --git a/tests/integration/test_zeromq.py b/tests/integration/test_zeromq.py new file mode 100644 index 00000000..bb92d84c --- /dev/null +++ b/tests/integration/test_zeromq.py @@ -0,0 +1,160 @@ +from threading import Event + +from compas_eve import Message +from compas_eve import Publisher +from compas_eve import Subscriber +from compas_eve import Topic +from compas_eve import set_default_transport + +try: + from compas_eve import ZeroMQTransport + ZEROMQ_AVAILABLE = True +except ImportError: + ZEROMQ_AVAILABLE = False + +import pytest + + +@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") +def test_zeromq_tcp_pubsub(): + """Test ZeroMQ transport with TCP endpoints.""" + tx = ZeroMQTransport("tcp://localhost:25555") + event = Event() + topic = Topic("/messages_compas_eve_test/tcp_pubsub/", Message) + + Subscriber(topic, lambda m: event.set(), transport=tx).subscribe() + + # Small delay to ensure subscriber is ready + import time + time.sleep(0.1) + + Publisher(topic, transport=tx).publish(Message(done=True)) + + received = event.wait(timeout=3) + assert received, "Message not received" + + tx.close() + + +@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") +def test_zeromq_tcp_message_content(): + """Test that message content is preserved correctly with TCP transport.""" + tx = ZeroMQTransport("tcp://localhost:25556") + event = Event() + topic = Topic("/messages_compas_eve_test/tcp_content/", Message) + + result = {} + + def callback(msg): + result["message"] = msg + event.set() + + Subscriber(topic, callback, transport=tx).subscribe() + + # Small delay to ensure subscriber is ready + import time + time.sleep(0.1) + + test_message = Message( + name="ZeroMQ Test", + value=42, + nested={"key": "value", "list": [1, 2, 3]} + ) + Publisher(topic, transport=tx).publish(test_message) + + received = event.wait(timeout=3) + assert received, "Message not received" + assert result["message"].name == "ZeroMQ Test" + assert result["message"].value == 42 + assert result["message"].nested["key"] == "value" + assert result["message"].nested["list"] == [1, 2, 3] + + tx.close() + + +@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") +def test_zeromq_tcp_multiple_topics(): + """Test that multiple topics work correctly with TCP transport.""" + tx = ZeroMQTransport("tcp://localhost:25557") + + topic1 = Topic("/test/topic1", Message) + topic2 = Topic("/test/topic2", Message) + + event1 = Event() + event2 = Event() + + result = {} + + def callback1(msg): + result["topic1"] = msg + event1.set() + + def callback2(msg): + result["topic2"] = msg + event2.set() + + # Subscribe to both topics + Subscriber(topic1, callback1, transport=tx).subscribe() + Subscriber(topic2, callback2, transport=tx).subscribe() + + # Small delay to ensure subscribers are ready + import time + time.sleep(0.1) + + # Publish to topic1 + Publisher(topic1, transport=tx).publish(Message(source="topic1")) + + # Publish to topic2 + Publisher(topic2, transport=tx).publish(Message(source="topic2")) + + received1 = event1.wait(timeout=3) + received2 = event2.wait(timeout=3) + + assert received1, "Message 1 not received" + assert received2, "Message 2 not received" + assert result["topic1"].source == "topic1" + assert result["topic2"].source == "topic2" + + tx.close() + + +@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") +def test_zeromq_tcp_unsubscribe(): + """Test unsubscribe functionality with TCP transport.""" + tx = ZeroMQTransport("tcp://localhost:25558") + topic = Topic("/test/unsub", Message) + + result = dict(count=0, event=Event()) + + def callback(msg): + result["count"] += 1 + result["event"].set() + + sub = Subscriber(topic, callback, transport=tx) + pub = Publisher(topic, transport=tx) + + # Subscribe and receive first message + sub.subscribe() + + # Small delay to ensure subscriber is ready + import time + time.sleep(0.1) + + pub.publish(Message(seq=1)) + + received = result["event"].wait(timeout=3) + assert received, "First message not received" + assert result["count"] == 1 + + # Unsubscribe + result["event"].clear() + sub.unsubscribe() + + # Publish second message - should not be received + pub.publish(Message(seq=2)) + + received = result["event"].wait(timeout=1) + assert received is False, "Second message received but it should have been unsubscribed" + assert result["count"] == 1, "Message count should still be 1" + + tx.close() \ No newline at end of file From 9ea3789b6e8871f11a95049083ad37787e18c694 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 20 Aug 2025 15:37:15 +0000 Subject: [PATCH 4/4] Address review feedback: remove ZeroMQ from main init, update error message, fix test imports Co-authored-by: gonzalocasas <933277+gonzalocasas@users.noreply.github.com> --- src/compas_eve/__init__.py | 9 --------- src/compas_eve/zeromq/__init__.py | 2 +- tests/integration/test_zeromq.py | 11 +---------- tests/unit/test_zeromq.py | 14 +------------- 4 files changed, 3 insertions(+), 33 deletions(-) diff --git a/src/compas_eve/__init__.py b/src/compas_eve/__init__.py index 20964e7e..8dadd667 100644 --- a/src/compas_eve/__init__.py +++ b/src/compas_eve/__init__.py @@ -20,7 +20,6 @@ EchoSubscriber Transport InMemoryTransport - ZeroMQTransport get_default_transport set_default_transport @@ -50,11 +49,6 @@ ) from .memory import InMemoryTransport -try: - from .zeromq import ZeroMQTransport -except ImportError: - ZeroMQTransport = None - HERE = os.path.dirname(__file__) HOME = os.path.abspath(os.path.join(HERE, "../../")) @@ -72,7 +66,4 @@ "set_default_transport", "InMemoryTransport", ] - -if ZeroMQTransport is not None: - __all__.append("ZeroMQTransport") __all_plugins__ = ["compas_eve.rhino.install"] diff --git a/src/compas_eve/zeromq/__init__.py b/src/compas_eve/zeromq/__init__.py index 34ecd1eb..5ecc86ff 100644 --- a/src/compas_eve/zeromq/__init__.py +++ b/src/compas_eve/zeromq/__init__.py @@ -43,7 +43,7 @@ class ZeroMQTransport(Transport, EventEmitterMixin): def __init__(self, endpoint, bind_subscriber=True, *args, **kwargs): if zmq is None: - raise ImportError("pyzmq is required for ZeroMQ transport. Please install it with: pip install pyzmq") + raise ImportError("pyzmq is required for ZeroMQ transport. Please install it with: pip install pyzmq or conda install pyzmq") super(ZeroMQTransport, self).__init__(*args, **kwargs) diff --git a/tests/integration/test_zeromq.py b/tests/integration/test_zeromq.py index bb92d84c..a52b4b90 100644 --- a/tests/integration/test_zeromq.py +++ b/tests/integration/test_zeromq.py @@ -5,17 +5,11 @@ from compas_eve import Subscriber from compas_eve import Topic from compas_eve import set_default_transport - -try: - from compas_eve import ZeroMQTransport - ZEROMQ_AVAILABLE = True -except ImportError: - ZEROMQ_AVAILABLE = False +from compas_eve.zeromq import ZeroMQTransport import pytest -@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") def test_zeromq_tcp_pubsub(): """Test ZeroMQ transport with TCP endpoints.""" tx = ZeroMQTransport("tcp://localhost:25555") @@ -36,7 +30,6 @@ def test_zeromq_tcp_pubsub(): tx.close() -@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") def test_zeromq_tcp_message_content(): """Test that message content is preserved correctly with TCP transport.""" tx = ZeroMQTransport("tcp://localhost:25556") @@ -72,7 +65,6 @@ def callback(msg): tx.close() -@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") def test_zeromq_tcp_multiple_topics(): """Test that multiple topics work correctly with TCP transport.""" tx = ZeroMQTransport("tcp://localhost:25557") @@ -118,7 +110,6 @@ def callback2(msg): tx.close() -@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") def test_zeromq_tcp_unsubscribe(): """Test unsubscribe functionality with TCP transport.""" tx = ZeroMQTransport("tcp://localhost:25558") diff --git a/tests/unit/test_zeromq.py b/tests/unit/test_zeromq.py index a4d09c97..622da0b8 100644 --- a/tests/unit/test_zeromq.py +++ b/tests/unit/test_zeromq.py @@ -5,23 +5,16 @@ from compas_eve import Subscriber from compas_eve import Topic from compas_eve import set_default_transport - -try: - from compas_eve import ZeroMQTransport - ZEROMQ_AVAILABLE = True -except ImportError: - ZEROMQ_AVAILABLE = False +from compas_eve.zeromq import ZeroMQTransport import pytest -@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") def test_zeromq_import(): """Test that ZeroMQ transport can be imported.""" assert ZeroMQTransport is not None -@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") def test_default_transport_publishing(): set_default_transport(ZeroMQTransport("inproc://test1")) event = Event() @@ -34,7 +27,6 @@ def test_default_transport_publishing(): assert received, "Message not received" -@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") def test_pubsub(): tx = ZeroMQTransport("inproc://test2") event = Event() @@ -49,7 +41,6 @@ def test_pubsub(): tx.close() -@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") def test_two_subs(): tx = ZeroMQTransport("inproc://test3") event1 = Event() @@ -68,7 +59,6 @@ def test_two_subs(): tx.close() -@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") def test_unsub(): tx = ZeroMQTransport("inproc://test4") topic = Topic("/messages_compas_eve_test/test_unsub/", Message) @@ -100,7 +90,6 @@ def callback(msg): tx.close() -@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") def test_message_type_parsing(): tx = ZeroMQTransport("inproc://test5") event = Event() @@ -123,7 +112,6 @@ def callback(msg): tx.close() -@pytest.mark.skipif(not ZEROMQ_AVAILABLE, reason="ZeroMQ transport not available") def test_dict_as_message(): tx = ZeroMQTransport("inproc://test6") event = Event()