diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 18e6d4d3..3976fc45 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -27,29 +27,6 @@ jobs: run: | pytest tests/unit - build-ironpython: - name: windows-ironpython - runs-on: windows-latest - steps: - - uses: actions/checkout@v2 - - name: Install dependencies - shell: cmd - run: | - curl -o compas.tar.gz -LJO https://pypi.debian.net/compas/latest - curl -o ironpython-pytest.tar.gz -LJO https://pypi.debian.net/ironpython-pytest/latest - choco install ironpython --version=2.7.8.1 - ipy -X:Frames -m ensurepip - ipy -X:Frames -m pip install --no-deps ironpython-pytest.tar.gz - - rem untar and rename, these cannot be installed using ironpip because they not longer have a setup.py - tar -xf compas.tar.gz && for /d %%i in (compas-*) do ren "%%i" compas - - - name: Run tests - env: - IRONPYTHONPATH: ./src;./compas/src - run: | - ipy -m pytest tests/unit - integration_tests: if: "!contains(github.event.pull_request.labels.*.name, 'docs-only')" runs-on: 'ubuntu-latest' @@ -63,6 +40,9 @@ jobs: python: '3.11' invoke_lint: false invoke_test: false + - name: Install test dependencies + run: | + pip install -r tests/integration/requirements.txt - name: Run integration tests run: | pytest tests/integration diff --git a/CHANGELOG.md b/CHANGELOG.md index 98a18f01..5e80c068 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added * Added support for MQTT-PAHO 2.0 versioned callbacks. +* Added `MessageCodec` abstract base class for extensible message serialization. +* Added `JsonMessageCodec` for JSON-based message serialization (default). +* Added `ProtobufMessageCodec` for binary message serialization using Protocol Buffers (requires `compas_pb`). +* Added `codec` parameter to `Transport`, `InMemoryTransport`, and `MqttTransport` classes. ### Changed @@ -17,6 +21,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Removed +* **BREAKING**: Removed IronPython support and `mqtt_cli` implementation. +* **BREAKING**: Removed support for Rhino 7 (IronPython-based). + ## [1.0.0] 2024-05-27 diff --git a/README.md b/README.md index 1956fb00..87cc87ee 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ Or using `conda`: * Publisher/subscriber communication model (N-to-N communication) * In-process events * MQTT support -* CPython & IronPython support +* Extensible codec system for message serialization (JSON, Protocol Buffers) ## Examples @@ -78,13 +78,28 @@ for i in range(10): This example shows how to send and receive from a single script, but running publishers and subscribers on different scripts, different processes, or even different computers will work the exact same way. +### Using different codecs -### Usage from Rhinoceros 3D +By default, COMPAS EVE uses JSON for message serialization. However, you can use different codecs for more efficient serialization: -It is possible to use the same code from within Rhino/Grasshopper. +```python +import compas_eve as eve +from compas_eve import JsonMessageCodec +from compas_eve.codecs import ProtobufMessageCodec +from compas_eve.mqtt import MqttTransport -Make sure you have installed it to Rhino using the COMPAS installation mechanism: +# Use JSON codec (default) +json_codec = JsonMessageCodec() +tx = MqttTransport("broker.hivemq.com", codec=json_codec) -```bash - python -m compas_rhino.install -v 7.0 +# Or use Protocol Buffers for binary serialization (requires compas_pb) +pb_codec = ProtobufMessageCodec() +tx = MqttTransport("broker.hivemq.com", codec=pb_codec) ``` + + +### Usage from Rhinoceros 3D + +It is possible to use the same code from within Rhino/Grasshopper. + +To install `compas_eve`, use the the syntax `# r: compas_eve` at the top of any Python 3.x script in Rhino/Grasshopper. diff --git a/docs/api.rst b/docs/api.rst index 324eb65d..ea24a92d 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -6,7 +6,8 @@ API Reference :maxdepth: 1 api/compas_eve - api/compas_eve.mqtt + api/compas_eve.codecs api/compas_eve.memory + api/compas_eve.mqtt api/compas_eve.ghpython diff --git a/docs/api/compas_eve.codecs.rst b/docs/api/compas_eve.codecs.rst new file mode 100644 index 00000000..9ac83321 --- /dev/null +++ b/docs/api/compas_eve.codecs.rst @@ -0,0 +1,2 @@ + +.. automodule:: compas_eve.codecs diff --git a/docs/api/compas_eve.rst b/docs/api/compas_eve.rst index 8ce0ba8a..625f24fe 100644 --- a/docs/api/compas_eve.rst +++ b/docs/api/compas_eve.rst @@ -1,5 +1 @@ -******************************************************************************** -compas_eve -******************************************************************************** - .. automodule:: compas_eve diff --git a/examples/codec_usage.py b/examples/codec_usage.py new file mode 100644 index 00000000..815eb5d4 --- /dev/null +++ b/examples/codec_usage.py @@ -0,0 +1,117 @@ +""" +Example demonstrating custom codec usage with COMPAS EVE. + +This example shows how to: +1. Use the default JsonMessageCodec +2. Create a custom codec +3. Use ProtobufMessageCodec (if compas_pb is installed) +""" + +import json + +import compas_eve as eve +from compas_eve import MessageCodec +from compas_eve.codecs import JsonMessageCodec + + +# Example 1: Using the default JSON codec (implicit) +print("=" * 60) +print("Example 1: Default JSON Codec (implicit)") +print("=" * 60) + +pub = eve.Publisher("/example/default") +sub = eve.EchoSubscriber("/example/default") +sub.subscribe() + +pub.publish(eve.Message(text="Hello with default JSON codec", count=1)) +print() + + +# Example 2: Explicitly using JSON codec +print("=" * 60) +print("Example 2: Explicit JSON Codec") +print("=" * 60) + +json_codec = JsonMessageCodec() +transport = eve.InMemoryTransport(codec=json_codec) + +pub = eve.Publisher("/example/json", transport=transport) +sub = eve.EchoSubscriber("/example/json", transport=transport) +sub.subscribe() + +pub.publish(eve.Message(text="Hello with explicit JSON codec", count=2)) +print() + + +# Example 3: Custom codec +print("=" * 60) +print("Example 3: Custom Codec") +print("=" * 60) + + +class UpperCaseCodec(MessageCodec): + """A simple custom codec that converts all string values to uppercase.""" + + def encode(self, message): + """Encode message by converting all string values to uppercase.""" + # Assume message is a Message instance + data = message.data + + # Convert string values to uppercase + encoded_data = {} + for key, value in data.items(): + if isinstance(value, str): + encoded_data[key] = value.upper() + else: + encoded_data[key] = value + return json.dumps(encoded_data) + + def decode(self, encoded_data, message_type): + """Decode data (strings remain uppercase).""" + data = json.loads(encoded_data) + return message_type.parse(data) + + +custom_codec = UpperCaseCodec() +custom_transport = eve.InMemoryTransport(codec=custom_codec) + +pub = eve.Publisher("/example/custom", transport=custom_transport) + +# Create a custom subscriber that prints the message +class PrintSubscriber(eve.Subscriber): + def message_received(self, message): + print(f"Received: {message}") + +sub = PrintSubscriber("/example/custom", transport=custom_transport) +sub.subscribe() + +pub.publish(eve.Message(text="hello world", count=3)) +print() + + +# Example 4: Protocol Buffers codec (if available) +print("=" * 60) +print("Example 4: Protocol Buffers Codec (if available)") +print("=" * 60) + +try: + from compas_eve.codecs import ProtobufMessageCodec + + pb_codec = ProtobufMessageCodec() + pb_transport = eve.InMemoryTransport(codec=pb_codec) + + pub = eve.Publisher("/example/protobuf", transport=pb_transport) + sub = eve.EchoSubscriber("/example/protobuf", transport=pb_transport) + sub.subscribe() + + pub.publish(eve.Message(text="Hello with Protocol Buffers", count=4)) + print("✓ Protocol Buffers codec is working!") + +except ImportError as e: + print(f"Protocol Buffers codec not available: {e}") + print("Install with: pip install compas_pb") + +print() +print("=" * 60) +print("All examples completed!") +print("=" * 60) diff --git a/pyproject.toml b/pyproject.toml index 8cafafb5..53b4a123 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,54 @@ +[build-system] +requires = ["setuptools>=66.0"] +build-backend = "setuptools.build_meta" + +# ============================================================================ +# project info +# ============================================================================ + +[project] +name = "compas_eve" +description = "COMPAS Event Extensions: adds event-based communication infrastructure to the COMPAS framework." +keywords = ["events", "event-driven", "compas", "architecture", "distributed systems"] +authors = [ + { name = "Gonzalo Casas", email = "casas@arch.ethz.ch" }, + { name = "Chen Kasirer", email = "kasirer@arch.ethz.ch" }, +] +license = { file = "LICENSE" } +readme = "README.md" +requires-python = ">=3.9" +dynamic = ['dependencies', 'optional-dependencies', 'version'] +classifiers = [ + "Development Status :: 4 - Beta", + "Topic :: Scientific/Engineering", + "Operating System :: Unix", + "Operating System :: POSIX", + "Operating System :: Microsoft :: Windows", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", +] + +[project.entry-points.'compas_pb.plugins'] +serializers = 'compas_eve.codecs.conversions' + +[project.urls] +Homepage = "https://github.com/compas-dev/compas_eve" +Documentation = "https://compas.dev/compas_eve" +Repository = "https://github.com/compas-dev/compas_eve" +Changelog = "https://github.com/compas-dev/compas_eve/blob/main/CHANGELOG.md" +Issues = "https://github.com/compas-dev/compas_eve/issues" +Forum = "https://forum.compas-framework.org/" + + +# ============================================================================ +# setuptools config +# ============================================================================ + [tool.black] line-length = 120 diff --git a/requirements-dev.txt b/requirements-dev.txt index 4c043dad..e9337a79 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,16 +1,12 @@ attrs >=17.4 -autopep8 -black -bump2version >=1.0.1 -check-manifest >=0.36 -compas_invocations -doc8 -flake8 +black >=22.12.0 +build +bump-my-version +compas_invocations2 +compas_pb >= 0.4.4 invoke >=0.14 -isort -pydocstyle -pytest >=3.2 +pytest-mock +ruff sphinx_compas2_theme twine -wheel --e . +wheel \ No newline at end of file diff --git a/src/compas_eve/__init__.py b/src/compas_eve/__init__.py index 8dadd667..3257e88b 100644 --- a/src/compas_eve/__init__.py +++ b/src/compas_eve/__init__.py @@ -20,6 +20,7 @@ EchoSubscriber Transport InMemoryTransport + MessageCodec get_default_transport set_default_transport @@ -47,6 +48,7 @@ get_default_transport, set_default_transport, ) +from .codecs import MessageCodec from .memory import InMemoryTransport HERE = os.path.dirname(__file__) @@ -62,6 +64,7 @@ "EchoSubscriber", "Topic", "Transport", + "MessageCodec", "get_default_transport", "set_default_transport", "InMemoryTransport", diff --git a/src/compas_eve/codecs/__init__.py b/src/compas_eve/codecs/__init__.py new file mode 100644 index 00000000..1c65497c --- /dev/null +++ b/src/compas_eve/codecs/__init__.py @@ -0,0 +1,190 @@ +""" +******************************************************************************** +compas_eve.codecs +******************************************************************************** + +.. currentmodule:: compas_eve.codecs + + +Classes +======= + +.. autosummary:: + :toctree: generated/ + :nosignatures: + + MessageCodec + JsonMessageCodec + ProtobufMessageCodec + +""" + +from compas.data import json_dumps +from compas.data import json_loads + +__all__ = ["MessageCodec", "JsonMessageCodec", "ProtobufMessageCodec"] + + +class MessageCodec(object): + """Abstract base class for message codecs. + + A codec is responsible for encoding and decoding messages + to/from a specific representation format (e.g., JSON, Protocol Buffers). + """ + + def encode(self, message): + """Encode a message to the codec's representation format. + + Parameters + ---------- + message : :class:`Message` or dict or object + Message to encode. Can be a Message instance, a dict, or + an object implementing the COMPAS data framework. + + Returns + ------- + bytes or str + Encoded representation of the message. + """ + raise NotImplementedError("Subclasses must implement encode()") + + def decode(self, encoded_data): + """Decode data from the codec's representation format. + + Parameters + ---------- + encoded_data : bytes + Encoded data to decode. + + Returns + ------- + :class:`Message` or dict or object + Decoded message after reconstruction from the encoded data. + """ + raise NotImplementedError("Subclasses must implement decode()") + + +class JsonMessageCodec(MessageCodec): + """JSON codec for message serialization. + + This codec uses the COMPAS framework's JSON serialization functions + to encode and decode message data. It can handle Message objects, + COMPAS Data objects, and regular dictionaries. + """ + + def encode(self, message): + """Encode a message to JSON string. + + Parameters + ---------- + message : :class:`Message` or dict or object + Message to encode. Can be a Message instance, a dict, or + an object implementing the COMPAS data framework. + + Returns + ------- + str + JSON string representation of the message. + """ + # Extract data from the message + try: + return json_dumps(message.data) + except (KeyError, AttributeError): + try: + return json_dumps(message) + except (KeyError, AttributeError): + return json_dumps(dict(message)) + + def decode(self, encoded_data, message_type): + """Decode JSON message payloads to message object. + + Parameters + ---------- + encoded_data : bytes + Message bytes to decode into a JSON string. + message_type : type + The message type class to use for parsing. + + Returns + ------- + :class:`Message` + Decoded message object. + """ + data = json_loads(encoded_data.decode()) + if hasattr(data, "__data__"): + return data + else: + return message_type.parse(data) + + +try: + import compas_pb + + COMPAS_PB_AVAILABLE = True +except ImportError: + COMPAS_PB_AVAILABLE = False + + +class ProtobufMessageCodec(MessageCodec): + """Protocol Buffers codec for message serialization. + + This codec uses the compas_pb package to encode and decode message data + using Protocol Buffers binary format. + + Note + ---- + This codec requires the ``compas_pb`` package to be installed. + If ``compas_pb`` is not available, attempting to encode or decode + will raise an ImportError. + """ + + def __init__(self): + super(ProtobufMessageCodec, self).__init__() + if not COMPAS_PB_AVAILABLE: + raise ImportError( + "The ProtobufMessageCodec requires 'compas_pb' to be installed. " + "Please install it with: pip install compas_pb" + ) + + def encode(self, message): + """Encode a message to Protocol Buffers binary format. + + Parameters + ---------- + message : :class:`Message` or dict or object + Message to encode. Can be a Message instance, a dict, or + an object implementing the COMPAS data framework. + + Returns + ------- + bytes + Protocol Buffers binary representation of the message. + """ + if not COMPAS_PB_AVAILABLE: + raise ImportError( + "The ProtobufMessageCodec requires 'compas_pb' to be installed. " + "Please install it with: pip install compas_pb" + ) + return compas_pb.pb_dump_bts(message) + + def decode(self, encoded_data, message_type=None): + """Decode Protocol Buffers binary data to message object. + + Parameters + ---------- + encoded_data : bytes + Protocol Buffers binary data to decode. + message_type : type, optional + The message type class (not used for protobuf as it's encoded in the data). + + Returns + ------- + object + Decoded message object. + """ + if not COMPAS_PB_AVAILABLE: + raise ImportError( + "The ProtobufMessageCodec requires 'compas_pb' to be installed. " + "Please install it with: pip install compas_pb" + ) + return compas_pb.pb_load_bts(encoded_data) diff --git a/src/compas_eve/codecs/conversions.py b/src/compas_eve/codecs/conversions.py new file mode 100644 index 00000000..b13517f8 --- /dev/null +++ b/src/compas_eve/codecs/conversions.py @@ -0,0 +1,22 @@ +from compas_pb.core import _deserialize_any +from compas_pb.core import _serializer_any +from compas_pb.registry import pb_deserializer +from compas_pb.registry import pb_serializer + +from compas_eve.proto import message_pb2 +from compas_eve import Message + + +@pb_serializer(Message) +def message_to_pb(message: Message) -> message_pb2.Message: + pb = message_pb2.Message() + for k, v in message.data.items(): + pb.data[k].CopyFrom(_serializer_any(v)) + return pb + +@pb_deserializer(message_pb2.Message) +def message_from_pb(pb: message_pb2.Message) -> Message: + message = Message() + for k, v in pb.data.items(): + message[k] = _deserialize_any(v) + return message diff --git a/src/compas_eve/core.py b/src/compas_eve/core.py index 768f254d..ee1241c7 100644 --- a/src/compas_eve/core.py +++ b/src/compas_eve/core.py @@ -1,5 +1,4 @@ -from compas.data import json_dumps -from compas.data import json_loads +from compas_eve.codecs import JsonMessageCodec DEFAULT_TRANSPORT = None @@ -30,11 +29,21 @@ def set_default_transport(transport): class Transport(object): - """Defines the base interface for different transport implementations.""" + """Defines the base interface for different transport implementations. - def __init__(self, *args, **kwargs): + Parameters + ---------- + codec : :class:`MessageCodec`, optional + The codec to use for encoding and decoding messages. + If not provided, defaults to :class:`JsonMessageCodec`. + """ + + def __init__(self, codec=None, *args, **kwargs): super(Transport, self).__init__(*args, **kwargs) self._id_counter = 0 + if codec is None: + codec = JsonMessageCodec() + self.codec = codec @property def id_counter(self): @@ -123,26 +132,6 @@ def __init__(self, name, message_type=None, **options): self.message_type = message_type or Message self.options = options - def _message_to_json(self, message): - """Convert a message to a JSON string. - - Normally, this method expects sub-classes of ``Message`` as input. - However, it can deal with regular dictionaries as well as classes - implementing the COMPAS data framework. - """ - try: - data = message.data - except (KeyError, AttributeError): - try: - data = message.__data__ - except (KeyError, AttributeError): - data = dict(message) - return json_dumps(data) - - def _message_from_json(self, json_message): - """Converts a JSON string back into a message instance.""" - return self.message_type.parse(json_loads(json_message)) - class Publisher(object): """Publisher for a specific topic. diff --git a/src/compas_eve/memory/__init__.py b/src/compas_eve/memory/__init__.py index 637c48cb..e50c1583 100644 --- a/src/compas_eve/memory/__init__.py +++ b/src/compas_eve/memory/__init__.py @@ -26,10 +26,17 @@ class InMemoryTransport(Transport, EventEmitterMixin): """In-Memory transport is ideal for simple single-process apps and testing. - It will only distribute messages within the same process, not across different processes.""" + It will only distribute messages within the same process, not across different processes. - def __init__(self, *args, **kwargs): - super(InMemoryTransport, self).__init__(*args, **kwargs) + Parameters + ---------- + codec : :class:`MessageCodec`, optional + The codec to use for encoding and decoding messages. + If not provided, defaults to :class:`JsonMessageCodec`. + """ + + def __init__(self, codec=None, *args, **kwargs): + super(InMemoryTransport, self).__init__(codec=codec, *args, **kwargs) self._local_callbacks = {} def on_ready(self, callback): @@ -49,7 +56,9 @@ def publish(self, topic, message): event_key = "event:{}".format(topic.name) def _callback(**kwargs): - self.emit(event_key, message) + encoded_message = self.codec.encode(message) + encoded_message_bytes = encoded_message if isinstance(encoded_message, bytes) else encoded_message.encode('utf-8') + self.emit(event_key, encoded_message_bytes) self.on_ready(_callback) @@ -74,10 +83,14 @@ def subscribe(self, topic, callback): event_key = "event:{}".format(topic.name) subscribe_id = "{}:{}".format(event_key, id(callback)) + def _local_callback(msg): + message_obj = self.codec.decode(msg, topic.message_type) + callback(message_obj) + def _callback(**kwargs): - self.on(event_key, callback) + self.on(event_key, _local_callback) - self._local_callbacks[subscribe_id] = callback + self._local_callbacks[subscribe_id] = _local_callback self.on_ready(_callback) diff --git a/src/compas_eve/mqtt/__init__.py b/src/compas_eve/mqtt/__init__.py index 88fb6b08..6a40e2b1 100644 --- a/src/compas_eve/mqtt/__init__.py +++ b/src/compas_eve/mqtt/__init__.py @@ -17,11 +17,6 @@ """ -import sys - -if sys.platform == "cli": - from .mqtt_cli import MqttTransport -else: - from .mqtt_paho import MqttTransport +from .mqtt_paho import MqttTransport __all__ = ["MqttTransport"] diff --git a/src/compas_eve/mqtt/mqtt_cli.py b/src/compas_eve/mqtt/mqtt_cli.py deleted file mode 100644 index 8412badb..00000000 --- a/src/compas_eve/mqtt/mqtt_cli.py +++ /dev/null @@ -1,135 +0,0 @@ -# fmt: off -from __future__ import print_function - -from ..core import Transport -from ..event_emitter import EventEmitterMixin - -import clr -import os -import sys - -lib_dir = os.path.join(os.path.dirname(__file__), "netlib") -if lib_dir not in sys.path: - sys.path.append(lib_dir) - -clr.AddReference("MQTTnet") - -from System import Action -from System.Text import Encoding -from System.Threading import CancellationToken, CancellationTokenSource -from System.Threading.Tasks import Task - -from MQTTnet import MqttFactory -from MQTTnet import MqttApplicationMessageBuilder -from MQTTnet.Client import MqttClientConnectResult -from MQTTnet.Client import MqttClientConnectResultCode -from MQTTnet.Client import MqttClientOptionsBuilder -from MQTTnet.Client import MqttClientDisconnectOptionsBuilder - - -class MqttTransport(Transport, EventEmitterMixin): - """MQTT transport allows sending and receiving messages using an MQTT broker. - - Parameters - ---------- - host : str - Host name for the MQTT broker, e.g. ``broker.hivemq.com`` or ``localhost`` if - you are running a local broker on your machine. - port : int - MQTT broker port, defaults to ``1883``. - """ - def __init__(self, host, port=1883, *args, **kwargs): - super(MqttTransport, self).__init__(*args, **kwargs) - self.host = host - self.port = port - self._is_connected = False - self._local_callbacks = {} - - self.cancellation_token_source = CancellationTokenSource() - self.cancellation_token = self.cancellation_token_source.Token - - self.factory = MqttFactory() - options = MqttClientOptionsBuilder().WithTcpServer(host, port).Build() - - self.client = self.factory.CreateMqttClient() - self.client.ConnectAsync(options, self.cancellation_token).ContinueWith.Overloads[Action[Task[MqttClientConnectResult]]](self._on_connect) - - - def _on_connect(self, event_args): - if event_args.Result.ResultCode == MqttClientConnectResultCode.Success: - self._is_connected = event_args.Result.ResultCode == MqttClientConnectResultCode.Success - self.emit("ready") - - def close(self): - options = MqttClientDisconnectOptionsBuilder().WithReasonString("Normal").Build() - self.client.DisconnectAsync(options, CancellationToken.None) # noqa: E999 (disable flake8 error, which incorrectly parses None as the python keyword) - - def on_ready(self, callback): - if self._is_connected: - callback() - else: - self.once("ready", callback) - - def publish(self, topic, message): - json_message = topic._message_to_json(message) - application_message = ( - MqttApplicationMessageBuilder() - .WithTopic(topic.name) - .WithPayload(json_message) - .Build() - ) - - def _callback(**kwargs): - self.client.PublishAsync(application_message, CancellationToken.None) - - self.on_ready(_callback) - - def subscribe(self, topic, callback): - event_key = "event:{}".format(topic.name) - subscribe_id = "{}:{}".format(event_key, id(callback)) - - def _local_callback(application_message): - payload = Encoding.UTF8.GetString(application_message.Payload) - msg = topic._message_from_json(payload) - callback(msg) - - def _subscribe_callback(**kwargs): - subscribe_opts = self.factory.CreateSubscribeOptionsBuilder().WithTopicFilter(lambda f: f.WithTopic(topic.name)).Build() - self.client.ApplicationMessageReceivedAsync += self._on_message - self.client.SubscribeAsync(subscribe_opts, self.cancellation_token) - self.on(event_key, _local_callback) - - self._local_callbacks[subscribe_id] = _local_callback - - self.on_ready(_subscribe_callback) - - return subscribe_id - - def _on_message(self, event_args): - event_key = "event:{}".format(event_args.ApplicationMessage.Topic) - self.emit(event_key, event_args.ApplicationMessage) - - def advertise(self, topic): - advertise_id = "advertise:{}:{}".format(topic.name, self.id_counter) - # mqtt does not need anything here - return advertise_id - - def unadvertise(self, topic): - pass - - def unsubscribe_by_id(self, subscribe_id): - ev_type, topic_name, _callback_id = subscribe_id.split(":") - event_key = "{}:{}".format(ev_type, topic_name) - - callback = self._local_callbacks[subscribe_id] - self.off(event_key, callback) - del self._local_callbacks[subscribe_id] - - unsubscribe_opts = self.factory.CreateUnsubscribeOptionsBuilder().WithTopicFilter(topic_name).Build() - self.client.UnsubscribeAsync(unsubscribe_opts, self.cancellation_token) - self.client.ApplicationMessageReceivedAsync -= self._on_message - - def unsubscribe(self, topic): - unsubscribe_opts = self.factory.CreateUnsubscribeOptionsBuilder().WithTopicFilter(topic.name).Build() - self.client.UnsubscribeAsync(unsubscribe_opts, self.cancellation_token) - self.client.ApplicationMessageReceivedAsync -= self._on_message diff --git a/src/compas_eve/mqtt/mqtt_paho.py b/src/compas_eve/mqtt/mqtt_paho.py index d9826a8c..5ac217c8 100644 --- a/src/compas_eve/mqtt/mqtt_paho.py +++ b/src/compas_eve/mqtt/mqtt_paho.py @@ -24,10 +24,13 @@ class MqttTransport(Transport, EventEmitterMixin): MQTT broker port, defaults to ``1883``. client_id : str, optional Client ID for the MQTT connection. If not provided, a unique ID will be generated. + codec : :class:`MessageCodec`, optional + The codec to use for encoding and decoding messages. + If not provided, defaults to :class:`JsonMessageCodec`. """ - def __init__(self, host, port=1883, client_id=None, *args, **kwargs): - super(MqttTransport, self).__init__(*args, **kwargs) + def __init__(self, host, port=1883, client_id=None, codec=None, *args, **kwargs): + super(MqttTransport, self).__init__(codec=codec, *args, **kwargs) self.host = host self.port = port self._is_connected = False @@ -76,8 +79,8 @@ def publish(self, topic, message): """ def _callback(**kwargs): - json_message = topic._message_to_json(message) - self.client.publish(topic.name, json_message) + encoded_message = self.codec.encode(message) + self.client.publish(topic.name, encoded_message) self.on_ready(_callback) @@ -103,8 +106,8 @@ def subscribe(self, topic, callback): subscribe_id = "{}:{}".format(event_key, id(callback)) def _local_callback(msg): - msg = topic._message_from_json(msg.payload.decode()) - callback(msg) + message_obj = self.codec.decode(msg.payload, topic.message_type) + callback(message_obj) def _subscribe_callback(**kwargs): self.client.subscribe(topic.name) diff --git a/src/compas_eve/mqtt/netlib/MQTTnet.dll b/src/compas_eve/mqtt/netlib/MQTTnet.dll deleted file mode 100644 index a2baf59f..00000000 Binary files a/src/compas_eve/mqtt/netlib/MQTTnet.dll and /dev/null differ diff --git a/src/compas_eve/proto/message.proto b/src/compas_eve/proto/message.proto new file mode 100644 index 00000000..e70e634b --- /dev/null +++ b/src/compas_eve/proto/message.proto @@ -0,0 +1,9 @@ +syntax = "proto3"; + +package compas_eve; + +import "compas_pb/generated/message.proto"; + +message Message { + map data = 1; +} \ No newline at end of file diff --git a/src/compas_eve/proto/message_pb2.py b/src/compas_eve/proto/message_pb2.py new file mode 100644 index 00000000..42038b3c --- /dev/null +++ b/src/compas_eve/proto/message_pb2.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: message.proto +# Protobuf Python Version: 6.31.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 6, + 31, + 1, + '', + 'message.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from compas_pb.generated import message_pb2 as compas__pb_dot_generated_dot_message__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rmessage.proto\x12\ncompas_eve\x1a!compas_pb/generated/message.proto\"|\n\x07Message\x12+\n\x04\x64\x61ta\x18\x01 \x03(\x0b\x32\x1d.compas_eve.Message.DataEntry\x1a\x44\n\tDataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12&\n\x05value\x18\x02 \x01(\x0b\x32\x17.compas_pb.data.AnyData:\x02\x38\x01\x62\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'message_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_MESSAGE_DATAENTRY']._loaded_options = None + _globals['_MESSAGE_DATAENTRY']._serialized_options = b'8\001' + _globals['_MESSAGE']._serialized_start=64 + _globals['_MESSAGE']._serialized_end=188 + _globals['_MESSAGE_DATAENTRY']._serialized_start=120 + _globals['_MESSAGE_DATAENTRY']._serialized_end=188 +# @@protoc_insertion_point(module_scope) diff --git a/tasks.py b/tasks.py index cd41ed61..f47c87e5 100644 --- a/tasks.py +++ b/tasks.py @@ -1,13 +1,17 @@ from __future__ import print_function import os +from pathlib import Path -from compas_invocations import build -from compas_invocations import docs -from compas_invocations import style -from compas_invocations import tests +from compas_invocations2 import build +from compas_invocations2 import docs +from compas_invocations2 import style +from compas_invocations2 import tests from invoke import Collection +import compas_pb +from compas_pb.invocations import generate_proto_classes + ns = Collection( docs.help, style.check, @@ -21,6 +25,7 @@ build.prepare_changelog, build.clean, build.release, + generate_proto_classes, ) ns.configure( { @@ -30,5 +35,8 @@ "source_dir": "src/compas_eve/ghpython/components", "target_dir": "src/compas_eve/ghpython/components/ghuser", }, + "proto_folder": Path("./src") / "compas_eve" / "proto", + "proto_include_paths": [Path("./src") / "compas_eve" / "proto", compas_pb.PROTOBUF_DEFS], + "proto_out_folder": Path("./src") / "compas_eve" / "proto", } ) diff --git a/tests/integration/requirements.txt b/tests/integration/requirements.txt new file mode 100644 index 00000000..0a9cfca4 --- /dev/null +++ b/tests/integration/requirements.txt @@ -0,0 +1 @@ +compas_pb >= 0.4.4 diff --git a/tests/integration/test_mqtt.py b/tests/integration/test_mqtt.py index 05c9fa9e..8fdb181e 100644 --- a/tests/integration/test_mqtt.py +++ b/tests/integration/test_mqtt.py @@ -1,6 +1,7 @@ from threading import Event -from compas.data import Data +from compas.geometry import Frame +from compas.datastructures import Graph from compas_eve import Message from compas_eve import Publisher @@ -116,39 +117,6 @@ def callback(msg): def test_compas_data_as_message(): - - class Header(Data): - def __init__(self, sequence_id=None): - super(Header, self).__init__() - self.sequence_id = sequence_id - - @property - def __data__(self): - return {"sequence_id": self.sequence_id} - - class DataTestMessage(Data): - def __init__(self, name=None, location=None, header=None): - super(DataTestMessage, self).__init__() - self.name = name - self.location = location - self.header = header or Header(1) - - @property - def __data__(self): - return {"name": self.name, "location": self.location, "header": self.header.__data__} - - @classmethod - def __from_data__(cls, data): - return cls( - name=data["name"], - location=data["location"], - header=Header.__from_data__(data["header"]), - ) - - @classmethod - def parse(cls, value): - return cls.__from_data__(value) - result = dict(value=None, event=Event()) def callback(msg): @@ -156,16 +124,17 @@ def callback(msg): result["event"].set() tx = MqttTransport(HOST) - topic = Topic("/messages_compas_eve_test/test_compas_data_as_message/", DataTestMessage) + topic = Topic("/messages_compas_eve_test/test_compas_data_as_message/") Subscriber(topic, callback, transport=tx).subscribe() - Publisher(topic, transport=tx).publish(DataTestMessage(name="Jazz", location=1.334)) + Publisher(topic, transport=tx).publish(dict(frame=Frame.worldXY(), graph=Graph())) + assert result is not None, "No result?" received = result["event"].wait(timeout=3) assert received, "Message not received" - assert result["value"].name == "Jazz" - assert result["value"].location == 1.334 - assert result["value"].header.sequence_id == 1 + assert result["value"]["frame"] == Frame.worldXY() + assert isinstance(result["value"]["graph"], Graph) + def test_nested_message_types(): diff --git a/tests/unit/test_codecs.py b/tests/unit/test_codecs.py new file mode 100644 index 00000000..a81e6381 --- /dev/null +++ b/tests/unit/test_codecs.py @@ -0,0 +1,171 @@ +from compas.geometry import Frame +from compas_eve import Message +from compas_eve.codecs import JsonMessageCodec +from compas_eve.codecs import ProtobufMessageCodec + + +def test_json_codec_encode_decode(): + codec = JsonMessageCodec() + + original_message = Message(name="test", value=42, active=True) + encoded = codec.encode(original_message) + wire_encoding = encoded.encode('utf-8') # simulate wire encoding + decoded = codec.decode(wire_encoding, Message) + + assert isinstance(encoded, str) + assert isinstance(decoded, Message) + assert decoded.name == "test" + assert decoded.value == 42 + assert decoded.active is True + + +def test_json_codec_nested_data(): + codec = JsonMessageCodec() + + nested_data = { + "coordinates": [1.0, 2.0, 3.0], + "metadata": {"author": "test", "version": 1}, + "tags": ["geometry", "point"] + } + + original_message = Message(**nested_data) + encoded = codec.encode(original_message).encode('utf-8') + decoded = codec.decode(encoded, Message) + + assert decoded.coordinates == [1.0, 2.0, 3.0] + assert decoded.metadata["author"] == "test" + assert decoded.metadata["version"] == 1 + assert decoded.tags == ["geometry", "point"] + + +def test_json_codec_empty_message(): + codec = JsonMessageCodec() + + original_message = Message() + encoded = codec.encode(original_message).encode('utf-8') + decoded = codec.decode(encoded, Message) + + assert isinstance(decoded, Message) + assert str(decoded) == "{}" + + +def test_json_codec_roundtrip(): + """Test JSON codec maintains data integrity through multiple encode/decode cycles.""" + codec = JsonMessageCodec() + + original_message = Message( + string_val="hello world", + int_val=123, + float_val=3.14159, + bool_val=True, + null_val=None, + list_val=[1, 2, 3], + dict_val={"key": "value"} + ) + + # Multiple roundtrips + current = original_message + for _ in range(3): + encoded = codec.encode(current).encode('utf-8') + current = codec.decode(encoded, Message) + + assert current.string_val == "hello world" + assert current.int_val == 123 + assert current.float_val == 3.14159 + assert current.bool_val is True + assert current.null_val is None + assert current.list_val == [1, 2, 3] + assert current.dict_val == {"key": "value"} + + +def test_protobuf_codec_encode_decode(): + codec = ProtobufMessageCodec() + + # Test with simple message + original_message = Message(name="test", value=42, active=True, frame=Frame.worldXY()) + encoded = codec.encode(original_message.data) + decoded = codec.decode(encoded, dict) + + assert isinstance(encoded, bytes) + assert isinstance(decoded, dict) + assert decoded["name"] == "test" + assert decoded["value"] == 42 + assert decoded["active"] is True + assert decoded["frame"].point == [0.0, 0.0, 0.0] + assert decoded["frame"].xaxis == [1.0, 0.0, 0.0] + assert decoded["frame"].yaxis == [0.0, 1.0, 0.0] + + +def test_protobuf_codec_nested_data(): + """Test Protobuf codec handles nested data structures.""" + codec = ProtobufMessageCodec() + + nested_data = { + "coordinates": [1.0, 2.0, 3.0], + "metadata": {"author": "test", "version": 1}, + "tags": ["geometry", "point"] + } + + original_message = Message(**nested_data) + encoded = codec.encode(original_message.data) + decoded = codec.decode(encoded) + + assert decoded["coordinates"] == [1.0, 2.0, 3.0] + assert decoded["metadata"]["author"] == "test" + assert decoded["metadata"]["version"] == 1 + assert decoded["tags"] == ["geometry", "point"] + + +def test_protobuf_codec_roundtrip(): + """Test Protobuf codec maintains data integrity through multiple encode/decode cycles.""" + codec = ProtobufMessageCodec() + + original_message = Message( + string_val="hello world", + int_val=123, + float_val=3.14159, + bool_val=True, + list_val=[1, 2, 3], + dict_val={"key": "value"} + ) + original_message = original_message.data + + # Multiple roundtrips + current = original_message + for _ in range(3): + encoded = codec.encode(current) + current = codec.decode(encoded) + + assert current["string_val"] == "hello world" + assert current["int_val"] == 123 + assert current["float_val"] == 3.14159 + assert current["bool_val"] is True + assert current["list_val"] == [1, 2, 3] + assert current["dict_val"] == {"key": "value"} + + +def test_codec_compatibility(): + """Test that both codecs produce equivalent results for the same message.""" + json_codec = JsonMessageCodec() + protobuf_codec = ProtobufMessageCodec() + + original_message = Message( + name="compatibility_test", + count=100, + enabled=False, + data=[1, 2, 3, 4, 5] + ) + + # Encode with both codecs + json_encoded = json_codec.encode(original_message).encode('utf-8') + protobuf_encoded = protobuf_codec.encode(original_message.data) + + # Decode with respective codecs + json_decoded = json_codec.decode(json_encoded, Message) + protobuf_decoded = protobuf_codec.decode(protobuf_encoded, Message) + + # Both should produce equivalent messages + assert json_decoded["name"] == protobuf_decoded["name"] == "compatibility_test" + assert json_decoded["count"] == protobuf_decoded["count"] == 100 + assert json_decoded["enabled"] == protobuf_decoded["enabled"] is False + assert json_decoded["data"] == protobuf_decoded["data"] == [1, 2, 3, 4, 5] diff --git a/tests/unit/test_mqtt_paho_compatibility.py b/tests/unit/test_mqtt_paho_compatibility.py index c9b18399..3cbab255 100644 --- a/tests/unit/test_mqtt_paho_compatibility.py +++ b/tests/unit/test_mqtt_paho_compatibility.py @@ -1,47 +1,46 @@ -import sys - -if sys.platform != "cli": - import pytest - from unittest.mock import Mock, patch - from compas_eve.mqtt.mqtt_paho import MqttTransport, PAHO_MQTT_V2_AVAILABLE - - def test_paho_mqtt_v1_compatibility(): - with patch("compas_eve.mqtt.mqtt_paho.PAHO_MQTT_V2_AVAILABLE", False), patch( - "paho.mqtt.client.Client" - ) as mock_client_class: - - mock_client = Mock() - mock_client_class.return_value = mock_client - - # This should work as if paho-mqtt 1.x is installed - transport = MqttTransport("localhost") - - # Should have called mqtt.Client() with client_id parameter only (no callback_api_version) - mock_client_class.assert_called_once() - call_args = mock_client_class.call_args - assert "client_id" in call_args.kwargs - assert call_args.kwargs["client_id"].startswith("compas_eve_") - assert "callback_api_version" not in call_args.kwargs - assert transport.client == mock_client - - def test_paho_mqtt_v2_compatibility(): - if not PAHO_MQTT_V2_AVAILABLE: - pytest.skip("paho-mqtt 2.x not available in this environment") - - with patch("paho.mqtt.client.Client") as mock_client_class: - from paho.mqtt.enums import CallbackAPIVersion - - mock_client = Mock() - mock_client_class.return_value = mock_client - - # This should work as if paho-mqtt 2.x is installed - transport = MqttTransport("localhost") - - # Should have called mqtt.Client() with both client_id and callback_api_version parameters - mock_client_class.assert_called_once() - call_args = mock_client_class.call_args - assert "client_id" in call_args.kwargs - assert call_args.kwargs["client_id"].startswith("compas_eve_") - assert "callback_api_version" in call_args.kwargs - assert call_args.kwargs["callback_api_version"] == CallbackAPIVersion.VERSION1 - assert transport.client == mock_client +import pytest +from unittest.mock import Mock, patch +from compas_eve.mqtt.mqtt_paho import MqttTransport, PAHO_MQTT_V2_AVAILABLE + + +def test_paho_mqtt_v1_compatibility(): + with patch("compas_eve.mqtt.mqtt_paho.PAHO_MQTT_V2_AVAILABLE", False), patch( + "paho.mqtt.client.Client" + ) as mock_client_class: + + mock_client = Mock() + mock_client_class.return_value = mock_client + + # This should work as if paho-mqtt 1.x is installed + transport = MqttTransport("localhost") + + # Should have called mqtt.Client() with client_id parameter only (no callback_api_version) + mock_client_class.assert_called_once() + call_args = mock_client_class.call_args + assert "client_id" in call_args.kwargs + assert call_args.kwargs["client_id"].startswith("compas_eve_") + assert "callback_api_version" not in call_args.kwargs + assert transport.client == mock_client + + +def test_paho_mqtt_v2_compatibility(): + if not PAHO_MQTT_V2_AVAILABLE: + pytest.skip("paho-mqtt 2.x not available in this environment") + + with patch("paho.mqtt.client.Client") as mock_client_class: + from paho.mqtt.enums import CallbackAPIVersion + + mock_client = Mock() + mock_client_class.return_value = mock_client + + # This should work as if paho-mqtt 2.x is installed + transport = MqttTransport("localhost") + + # Should have called mqtt.Client() with both client_id and callback_api_version parameters + mock_client_class.assert_called_once() + call_args = mock_client_class.call_args + assert "client_id" in call_args.kwargs + assert call_args.kwargs["client_id"].startswith("compas_eve_") + assert "callback_api_version" in call_args.kwargs + assert call_args.kwargs["callback_api_version"] == CallbackAPIVersion.VERSION1 + assert transport.client == mock_client