Skip to content

Add support for rosidl::Buffer-aware per-endpoint pub/sub (backport #930)#987

Open
mergify[bot] wants to merge 1 commit into
lyricalfrom
mergify/bp/lyrical/pr-930
Open

Add support for rosidl::Buffer-aware per-endpoint pub/sub (backport #930)#987
mergify[bot] wants to merge 1 commit into
lyricalfrom
mergify/bp/lyrical/pr-930

Conversation

@mergify

@mergify mergify Bot commented Jun 4, 2026

Copy link
Copy Markdown

Description

This pull request adds full rosidl::Buffer support to rmw_zenoh_cpp, enabling per-endpoint Zenoh publishers and subscribers for zero-copy buffer transport between compatible backends. When a publisher and subscriber share compatible non-CPU buffer backends, data can be transferred via a lightweight descriptor; when backends are incompatible, the system falls back to standard CPU-based buffer serialization.

This pull request consists of the following key changes:

  • Backend lifecycle: Calls rosidl_buffer_backend_registry::initialize_buffer_backends() / shutdown_buffer_backends() during RMW init/shutdown to load and tear down buffer backend plugins.
  • Liveliness key-expression extension: Extended key-expressions to advertise each endpoint's supported backends, enabling dynamic discovery.
  • Graph cache discovery callbacks: Buffer-aware publishers and subscribers register discovery callbacks to detect each other and dynamically create per-endpoint Zenoh publishers/subscribers.
  • Buffer-aware publishers: Create per-subscriber Zenoh endpoints; endpoint info is passed to the typesupport serialization layer, which delegates compatibility to each backend's create_descriptor_with_endpoint() (nullptr: CPU fallback). Publisher creation explicitly adds "cpu" to backend_aux_info.
  • Fallback publish: publish() first sends endpoint-aware messages via publish_buffer_aware(), then conditionally falls through to the standard base-key publish path only when the total matched subscription count exceeds discovered buffer-aware subscribers -- avoiding unnecessary CPU conversion when all subscribers are buffer-aware.
  • Buffer-aware subscribers: Create per-publisher Zenoh subscriptions; the Message struct owns endpoint info via std::optional<EndpointInfoStorage> Endpoint info is passed into deserialization for correct backend reconstruction.
  • acceptable_buffer_backends: Parses the subscription option -- NULL/empty/"cpu": CPU-only (advertises "cpu" in liveliness token); "any": all installed; specific names: filtered. In on_publisher_discovered(), CPU is always added to the publisher's backend list.

Is this user-facing behavior change?

This pull request does not change existing rmw_zenoh_cpp behavior for standard (non-Buffer) messages. For messages with uint8[] fields, the per-endpoint transport is transparent -- publishers and subscribers share backend info automatically, and CPU fallback ensures correctness when backends are incompatible.

Did you use Generative AI?

Yes. Claude (claude-4.6-opus) via Cursor was used to assist with creating an initial prototype version of the changes contained in this PR.

Additional Information

This PR is part of the broader ROS 2 native buffer feature introduced in this post.


This is an automatic backport of pull request #930 done by Mergify.

* Add support for rosidl::Buffer-aware per-endpoint pub/sub

Signed-off-by: CY Chen <cyc@nvidia.com>

* Add buffer backend init/shutdown functions

Signed-off-by: CY Chen <cyc@nvidia.com>

* Fix lint error

Signed-off-by: CY Chen <cyc@nvidia.com>

* Rename backend aux info to backend metadata

Signed-off-by: CY Chen <cyc@nvidia.com>

* Update to use per-context buffer backend registry support

Signed-off-by: CY Chen <cyc@nvidia.com>

* Add support for legacy subscribers in the rosidl::buffer path

Signed-off-by: CY Chen <cyc@nvidia.com>

* Add CPU group endpoints

Signed-off-by: CY Chen <cyc@nvidia.com>

* Use per-topic CPU channels

Signed-off-by: CY Chen <cyc@nvidia.com>

* fix(graph_cache): release graph_mutex_ before invoking discovery callbacks

update_topic_map_for_put() collected discovery callbacks under
discovery_mutex_ but still invoked them while graph_mutex_ was held
(via the lock_guard in parse_put).  Any callback that re-enters
graph_cache — e.g. creating a per-endpoint subscription which calls
register_publisher_discovery_callback() — would attempt to re-acquire
graph_mutex_ on the same thread, deadlocking immediately.

Fix: change update_topic_map_for_put() and update_topic_maps_for_put()
to return the collected callbacks instead of invoking them.  parse_put()
switches from lock_guard to unique_lock so it can call lock.unlock()
before iterating over the returned callbacks.

This is a defensive complement to the lock-order fix in e91c15a.
While no current callback directly re-acquires graph_mutex_, invoking
external callbacks under an internal mutex is an API contract violation
that creates fragility for future changes.

Signed-off-by: YuanYu Yuan <yuanyu.yuan@zettascale.tech>

* style: fix uncrustify line-length in is_cpu_only_backend_metadata

Signed-off-by: YuanYu Yuan <yuanyu.yuan@zettascale.tech>

* Use single shared accelerated channel per buffer-aware subscriber

Signed-off-by: CY Chen <cyc@nvidia.com>

* Address review comments

Signed-off-by: CY Chen <cyc@nvidia.com>

* fix(liveliness): escape '/' in backend names embedded in key expressions

* Update buffer size estimation to align with rmw_fastrtps_cpp

Signed-off-by: CY Chen <cyc@nvidia.com>

* Change logs for rosidl::Buffer to DEBUG level

Signed-off-by: CY Chen <cyc@nvidia.com>

* Replace RCUTILS_LOG_DEBUG_NAMED with RMW_ZENOH_ROSIDL_BUFFER_LOG_DEBUG_NAMED

Signed-off-by: CY Chen <cyc@nvidia.com>

---------

Signed-off-by: CY Chen <cyc@nvidia.com>
Signed-off-by: YuanYu Yuan <yuanyu.yuan@zettascale.tech>
Co-authored-by: yuanyuyuan <az6980522@gmail.com>
(cherry picked from commit e95c62d)
@YuanYuYuan

Copy link
Copy Markdown
Contributor

Pulls: #987
Gist: https://gist.githubusercontent.com/YuanYuYuan/cc38741930c9d04c343085d9166a2e54/raw/e7dd17e791d0c6c5460aee125dbc2342a8c824a6/ros2.repos
BUILD args: --packages-above-and-dependencies rmw_zenoh_cpp
TEST args: --packages-above rmw_zenoh_cpp
ROS Distro: lyrical
Job: ci_launcher
ci_launcher ran: https://ci.ros2.org/job/ci_launcher/19468

  • Linux Build Status
  • Linux-aarch64 Build Status
  • Linux-rhel Build Status
  • Windows Build Status

@skyegalaxy skyegalaxy assigned skyegalaxy and unassigned skyegalaxy Jun 4, 2026
@YuanYuYuan

Copy link
Copy Markdown
Contributor

CI Test Results

Platform Result Failed Passed Skipped Duration Report
Linux ✅ pass 0 17393 2647 132m 24s results
Linux-aarch64 ❌ fail 2 17440 2714 125m 44s results
Linux-rhel ❌ fail 6 17190 2704 209m 41s results
Windows ❌ fail 42 16943 2817 489m 34s results
Linux-aarch64 — 2 failure(s)
Package Test
CTest test_executor
rclpy.rclpy.test.test_executor.TestExecutor test_coroutine_exception_after_await
Linux-rhel — 6 failure(s)
Package Test
CTest uncrustify
CTest test_executor
rclpy.rclpy.test.test_executor.TestExecutor test_coroutine_exception_after_await
tf2_ros.uncrustify include/tf2_ros/static_transform_broadcaster.hpp
tf2_ros.uncrustify include/tf2_ros/transform_broadcaster.hpp
topic_monitor.topic_monitor.test.test_mypy test_mypy
Windows — 42 failure(s)
Package Test
CTest test_publisher_subscriber__rclpy__rmw_zenoh_cpp
CTest test_action_client_server__rclcpp__rmw_zenoh_cpp
CTest test_tutorial_parameter_events_async__rmw_connextdds
CTest test_async_clock
demo_nodes_cpp.TestExecutablesTutorial test_processes_output
rclpy.rclpy.test.test_async_clock test_sleep_multiple_concurrent_waiters
ros2cli.ros2cli.test.test_ros2cli_daemon test_get_name
ros2cli.ros2cli.test.test_ros2cli_daemon test_get_namespace
ros2cli.ros2cli.test.test_ros2cli_daemon test_get_node_names_and_namespaces
ros2cli.ros2cli.test.test_ros2cli_daemon test_get_node_names_and_namespaces_with_enclaves
ros2cli.ros2cli.test.test_ros2cli_daemon test_get_topic_names_and_types
ros2cli.ros2cli.test.test_ros2cli_daemon test_get_service_names_and_types
ros2cli.ros2cli.test.test_ros2cli_daemon test_get_action_names_and_types
ros2cli.ros2cli.test.test_ros2cli_daemon test_get_publisher_names_and_types_by_node
ros2cli.ros2cli.test.test_ros2cli_daemon test_get_subscriber_names_and_types_by_node
ros2cli.ros2cli.test.test_ros2cli_daemon test_get_service_names_and_types_by_node
ros2cli.ros2cli.test.test_ros2cli_daemon test_get_client_names_and_types_by_node
ros2cli.ros2cli.test.test_ros2cli_daemon test_get_action_server_names_and_types_by_node
ros2cli.ros2cli.test.test_ros2cli_daemon test_get_action_client_names_and_types_by_node
ros2cli.ros2cli.test.test_ros2cli_daemon test_get_publishers_info_by_topic
ros2cli.ros2cli.test.test_ros2cli_daemon test_get_subscriptions_info_by_topic
ros2cli.ros2cli.test.test_ros2cli_daemon test_get_clients_info_by_service
ros2cli.ros2cli.test.test_ros2cli_daemon test_get_servers_info_by_service
ros2cli.ros2cli.test.test_ros2cli_daemon test_count_publishers
ros2cli.ros2cli.test.test_ros2cli_daemon test_count_subscribers
ros2cli.ros2cli.test.test_ros2cli_daemon test_count_clients
ros2cli.ros2cli.test.test_ros2cli_daemon test_count_services
ros2cli.ros2cli.test.test_strategy test_with_daemon_running
ros2cli.ros2cli.test.test_strategy test_with_daemon_spawn
ros2cli.ros2cli.test.test_strategy test_enforce_no_daemon
ros2multicast.ros2multicast.test.test_api test_api
ros2multicast.ros2multicast.test.test_api test_group_and_port
ros2multicast.ros2multicast.test.test_api test_port_mismatch
sros2.test.sros2.commands.security.verbs.test_generate_policy test_generate_policy
test_communication.TestActionClientServer test_client_finishes_in_a_finite_amount_of_time[Fibonacci]
test_communication.TestActionClientServerAfterShutdown test_processes_finished_gracefully[Fibonacci]
test_communication.TestPublisherSubscriber test_subscriber_terminates_in_a_finite_amount_of_time[Builtins]
test_communication.TestPublisherSubscriber test_subscriber_terminates_in_a_finite_amount_of_time[Constants]
test_communication.TestPublisherSubscriber test_subscriber_terminates_in_a_finite_amount_of_time[Strings]
test_communication.TestPublisherSubscriberAfterShutdown test_processes_finished_gracefully[Builtins]
test_communication.TestPublisherSubscriberAfterShutdown test_processes_finished_gracefully[Constants]
test_communication.TestPublisherSubscriberAfterShutdown test_processes_finished_gracefully[Strings]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants