diff --git a/CMakeLists.txt b/CMakeLists.txt index 6e31586..d17af24 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -46,6 +46,7 @@ add_library(${PROJECT_NAME} SHARED src/web_video_server.cpp src/multipart_stream.cpp src/streamer.cpp + src/subscriber.cpp src/utils.cpp ) @@ -61,6 +62,7 @@ target_link_libraries(${PROJECT_NAME} async_web_server_cpp::async_web_server_cpp pluginlib::pluginlib rclcpp::rclcpp + ${sensor_msgs_TARGETS} rmw::rmw Boost::boost PRIVATE @@ -68,7 +70,7 @@ target_link_libraries(${PROJECT_NAME} ) add_library(${PROJECT_NAME}_streamers SHARED - src/streamers/image_transport_streamer.cpp + src/streamers/image_streamer.cpp src/streamers/libav_streamer.cpp src/streamers/h264_streamer.cpp src/streamers/jpeg_streamers.cpp @@ -92,7 +94,6 @@ target_link_libraries(${PROJECT_NAME}_streamers ${PROJECT_NAME} async_web_server_cpp::async_web_server_cpp cv_bridge::cv_bridge - image_transport::image_transport pluginlib::pluginlib rclcpp::rclcpp ${sensor_msgs_TARGETS} @@ -105,6 +106,25 @@ target_link_libraries(${PROJECT_NAME}_streamers ${swscale_LIBRARIES} ) +add_library(${PROJECT_NAME}_subscribers SHARED + src/subscribers/image_transport_subscriber.cpp +) + +target_include_directories(${PROJECT_NAME}_subscribers + PUBLIC + "$" + "$" +) + +target_link_libraries(${PROJECT_NAME}_subscribers + ${PROJECT_NAME} + async_web_server_cpp::async_web_server_cpp + image_transport::image_transport + pluginlib::pluginlib + rclcpp::rclcpp + ${sensor_msgs_TARGETS} +) + ## Declare a cpp executable add_executable(${PROJECT_NAME}_node src/web_video_server_node.cpp @@ -116,7 +136,8 @@ target_link_libraries(${PROJECT_NAME}_node rclcpp_components_register_nodes(${PROJECT_NAME} "web_video_server::WebVideoServer") -pluginlib_export_plugin_description_file(web_video_server plugins.xml) +pluginlib_export_plugin_description_file(web_video_server streamer_plugins.xml) +pluginlib_export_plugin_description_file(web_video_server subscriber_plugins.xml) ############# ## Install ## @@ -128,7 +149,7 @@ install( ) install( - TARGETS ${PROJECT_NAME} ${PROJECT_NAME}_streamers + TARGETS ${PROJECT_NAME} ${PROJECT_NAME}_streamers ${PROJECT_NAME}_subscribers EXPORT export_${PROJECT_NAME} LIBRARY DESTINATION lib ARCHIVE DESTINATION lib diff --git a/README.md b/README.md index ef833ab..b85a18d 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,9 @@ This node provides HTTP streaming of ROS topics in various formats, making it ea ## Features +- Subscribe to ROS topics in multiple formats: + - image_transport + - Stream ROS image topics over HTTP in multiple formats: - MJPEG (Motion JPEG) - VP8 (WebM) @@ -11,11 +14,12 @@ This node provides HTTP streaming of ROS topics in various formats, making it ea - H264 (MP4) - PNG streams - ROS compressed image streams + - Query snapshots of image topics in multiple formats: - JPEG - PNG - ROS compressed image -- Plugin-based architecture for easy addition of new streaming formats +- Plugin-based architecture for easy addition of new subscribers or streamer formats - Adjustable quality, size, and other streaming parameters - Web interface to browse available image topics - Support for different QoS profiles in ROS 2 @@ -91,6 +95,7 @@ ros2 run web_video_server web_video_server | `server_threads` | int | 1 | 1+ | Number of server threads for handling HTTP requests | | `ros_threads` | int | 2 | 1+ | Number of threads for ROS message handling | | `verbose` | bool | false | true, false | Enable verbose logging | +| `default_qos_profile` | string | "default" | "default", "system_default", "sensor_data", "services_default" | QoS profile for ROS 2 subscribers | | `default_stream_type` | string | "mjpeg" | "mjpeg", "vp8", "vp9", "h264", "png", "ros_compressed" | Default format for video streams | | `publish_rate` | double | -1.0 | -1.0 or positive value | Rate for republishing images (-1.0 means no republishing) | @@ -181,6 +186,9 @@ http://localhost:8080/snapshot?topic=/camera/image_raw ## Creating custom streamer plugins See the [custom streamer plugin tutorial](doc/custom-streamer-plugin.md) for information on how to write your own streamer plugins. +## Creating custom subscriber plugins +See the [custom subscriber plugin tutorial](doc/custom-subscriber-plugin.md) for information on how to write your own subscriber plugins. + ## About This project is released as part of the [Robot Web Tools](https://robotwebtools.github.io/) effort. diff --git a/doc/custom-streamer-plugin.md b/doc/custom-streamer-plugin.md index 6057c75..518dbe5 100644 --- a/doc/custom-streamer-plugin.md +++ b/doc/custom-streamer-plugin.md @@ -15,10 +15,7 @@ This tutorial will guide you through the steps to create a simple custom streame 1. Add `TestStreamer` and `TestStreamerFactory` classes to `include/test_streamer_plugin/test_streamer_plugin.hpp` header file: ```cpp - #ifndef TEST_STREAMER_PLUGIN__TEST_STREAMER_PLUGIN_HPP_ - #define TEST_STREAMER_PLUGIN__TEST_STREAMER_PLUGIN_HPP_ - - #include "test_streamer_plugin/visibility_control.h" + #pragma once #include "web_video_server/streamer.hpp" @@ -52,7 +49,6 @@ This tutorial will guide you through the steps to create a simple custom streame } // namespace test_streamer_plugin - #endif // TEST_STREAMER_PLUGIN__TEST_STREAMER_PLUGIN_HPP_ ``` 1. Implement the `TestStreamer` and `TestStreamerFactory` classes in `src/test_streamer_plugin.cpp`: diff --git a/doc/custom-subscriber-plugin.md b/doc/custom-subscriber-plugin.md new file mode 100644 index 0000000..729811c --- /dev/null +++ b/doc/custom-subscriber-plugin.md @@ -0,0 +1,243 @@ +# How to write a custom subscriber plugin + +This tutorial will guide you through the steps to create a simple custom subscriber plugin for the `web_video_server` package in ROS 2. The example plugin will log messages when it is created, started, and when frames are restreamed. + +1. Create you local workspace if you don't have one: + ```bash + mkdir -p ~/ros_ws/src + cd ~/ros_ws/src + ``` +1. Create a new package for your custom subscriber plugin: + ```bash + ros2 pkg create --build-type ament_cmake test_subscriber_plugin --dependencies web_video_server pluginlib --library-name test_subscriber_plugin + cd test_subscriber_plugin + ``` + +1. Add `TestSubscriber` and `TestSubscriberFactory` classes to `include/test_subscriber_plugin/test_subscriber_plugin.hpp` header file: + ```cpp + #pragma once + + #include "web_video_server/subscriber.hpp" + + #include + #ifdef CV_BRIDGE_USES_OLD_HEADERS + #include + #else + #include + #endif + + // replace the following line with one appropriate for you data type + #include + + namespace test_subscriber_plugin + { + + class TestSubscriber : public web_video_server::SubscriberBase + { + public: + TestSubscriber(rclcpp::Node::SharedPtr node); + + ~TestSubscriber(); + + void subscribe(const async_web_server_cpp::HttpRequest &request, + const std::string& topic, + const web_video_server::ImageCallback& callback); + + private: + // replace param in the following line with one appropriate for you data type + void subscriber_callback(const std_msgs::msg::String::ConstSharedPtr &input_msg); + + // replace param in the following line with one appropriate for you data type + rclcpp::Subscription::SharedPtr sub_; + rclcpp::CallbackGroup::SharedPtr cbg_; + }; + + class TestSubscriberFactory : public web_video_server::SubscriberFactoryInterface + { + public: + // replace the text string below with one appropriate for you data type + // it should agree with value returned by the rclcpp function + // node.get_topic_names_and_types() + std::string get_type() override {return "std_msgs/msg/String";} + + std::shared_ptr create_subscriber( + rclcpp::Node::SharedPtr node); + + std::vector get_available_topics(rclcpp::Node & node); + }; + + } // namespace test_subscriber_plugin + ``` + +1. Implement the `TestSubscriber` and `TestSubscriberFactory` classes in `src/test_subscriber_plugin.cpp`: + ```cpp + #include "test_subscriber_plugin/test_subscriber_plugin.hpp" + + namespace test_subscriber_plugin + { + + TestSubscriber::TestSubscriber(rclcpp::Node::SharedPtr node) + : web_video_server::SubscriberBase(node, "test_subscriber") + { + const std::scoped_lock lock(subscriber_mutex); + + RCLCPP_INFO(logger_, "TestSubscriber created!"); + + // Declare any new parameters required for this subscriber + if (!node_->has_parameter("test_parameter")) node_->declare_parameter("test_parameter", "default"); + } + + TestSubscriber::~TestSubscriber() + { + const std::scoped_lock lock(subscriber_mutex); + inactive_ = true; + + RCLCPP_INFO(logger_, "TestSubscriber destroyed!"); + } + + void TestSubscriber::subscribe(const async_web_server_cpp::HttpRequest &request, + const std::string& topic, + const web_video_server::ImageCallback& callback) + { + const std::scoped_lock lock(subscriber_mutex); + + callback_ = callback; + + RCLCPP_INFO(logger_, "TestSubscriber started for topic: %s", topic.c_str()); + + // Load parameters used by this subscriber + std::string default_test_parameter = node_->get_parameter("test_parameter").as_string(); + std::string test_parameter = request.get_query_param_value_or_default("test_parameter", default_test_parameter); + + std::string default_qos_profile = node_->get_parameter("default_qos_profile").as_string(); + auto qos_profile_name = request.get_query_param_value_or_default("qos_profile", default_qos_profile); + + // Get QoS profile from query parameter + RCLCPP_INFO( + logger_, "Streaming topic %s with QoS profile %s", topic.c_str(), + qos_profile_name.c_str()); + auto qos_profile = web_video_server::get_qos_profile_from_name(qos_profile_name); + if (!qos_profile) { + qos_profile = rmw_qos_profile_default; + RCLCPP_ERROR( + logger_, + "Invalid QoS profile %s specified. Using default profile.", + qos_profile_name.c_str()); + } + + const auto qos = rclcpp::QoS( + rclcpp::QoSInitialization(qos_profile.value().history, 1), + qos_profile.value()); + + cbg_ = node_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); + rclcpp::SubscriptionOptions options; + options.callback_group = cbg_; + + // Create subscriber (update as appropriate for your subscriber) + sub_ = node_->create_subscription( + topic, qos, std::bind(&TestSubscriber::subscriber_callback, this, std::placeholders::_1), options + ); + } + + void TestSubscriber::subscriber_callback(const std_msgs::msg::String::ConstSharedPtr &input_msg) + { + const std::scoped_lock lock(subscriber_mutex); + + if(inactive_) return; + + RCLCPP_INFO_STREAM(logger_, "New TestSubscriber msg: " << input_msg->data); + + // Convert input msg to image + cv::Mat image(500, 1000, CV_8UC3, cv::Scalar(0, 0, 0)); + cv:putText(image, input_msg->data, cv::Point(30,250), cv::FONT_HERSHEY_SIMPLEX, 1.0, cv::Scalar(255, 0, 0), 2, cv::LINE_AA); + + // Send to streamer using callback + cv_bridge::CvImage bridge_image(std_msgs::msg::Header(), sensor_msgs::image_encodings::RGB8, image); + sensor_msgs::msg::Image output_msg; + bridge_image.toImageMsg(output_msg); + sensor_msgs::msg::Image::ConstSharedPtr output_ptr = std::make_shared(output_msg); + try_forward_image(output_ptr); + } + + std::shared_ptr TestSubscriberFactory::create_subscriber( + rclcpp::Node::SharedPtr node + ) { + return std::make_shared(node); + } + + std::vector TestSubscriberFactory::get_available_topics(rclcpp::Node & node) + { + std::vector result; + auto topic_names_and_types = node.get_topic_names_and_types(); + for (const auto & topic_and_types : topic_names_and_types) { + for (const auto & type : topic_and_types.second) { + if (type == this->get_type()) { + result.push_back(topic_and_types.first); + break; + } + } + } + return result; + } + + } // namespace test_subscriber_plugin + + #include "pluginlib/class_list_macros.hpp" + + PLUGINLIB_EXPORT_CLASS( + test_subscriber_plugin::TestSubscriberFactory, + web_video_server::SubscriberFactoryInterface) + ``` + +1. Add `plugins.xml` file with plugin description: + ```xml + + + Test subscriber implementation + + + ``` + +1. Update `CMakeLists.txt` to export the plugin description file (Add this anywhere after `find_package` section): + ```cmake + pluginlib_export_plugin_description_file(web_video_server plugins.xml) + ``` + +1. Build your package: + ```bash + cd ~/ros_ws + colcon build --packages-select test_subscriber_plugin + source install/setup.bash + ``` + +1. Run the `web_video_server` node and test your custom subscriber plugin by accessing a topic of the appropriate format: + ```bash + ros2 topic pub /your_topic std_msgs/msg/String "data: test" + ros2 run web_video_server web_video_server --ros-args -p port:=8082 -p address:=localhost + ``` + Then open your web browser and navigate to: + ``` + http://localhost:8082/stream?topic=/your_topic + ``` + +## Implementation hints +- You can access query parameters from the HTTP request in your subscriber constructor using `request.get_query_param_value_or_default` method. +- Use `logger_` member variable from the base `SubscriberBase` class for logging. +- Link specific targets in `CMakeLists.txt`. For example, replace: + ```cmake + target_link_libraries( + test_subscriber_plugin PUBLIC + ${web_video_server_TARGETS} + ${pluginlib_TARGETS} + ) + ``` + with: + ```cmake + target_link_libraries( + test_subscriber_plugin + web_video_server::web_video_server + pluginlib::pluginlib + ) + ``` \ No newline at end of file diff --git a/include/web_video_server/streamer.hpp b/include/web_video_server/streamer.hpp index cacda2e..78e3d2c 100644 --- a/include/web_video_server/streamer.hpp +++ b/include/web_video_server/streamer.hpp @@ -40,6 +40,8 @@ #include "rclcpp/logger.hpp" #include "rclcpp/node.hpp" +#include "web_video_server/subscriber.hpp" + namespace web_video_server { @@ -84,9 +86,12 @@ class StreamerBase : public StreamerInterface StreamerBase( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node, std::string logger_name = "streamer"); + std::mutex send_mutex; + bool is_inactive() override { return inactive_; @@ -106,6 +111,8 @@ class StreamerBase : public StreamerInterface rclcpp::Logger logger_; bool inactive_; std::string topic_; + std::map> subscriber_factories_; + std::shared_ptr subscriber_; }; /** @@ -133,7 +140,9 @@ class StreamerFactoryInterface virtual std::shared_ptr create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::WeakPtr node) = 0; + std::map> & subscriber_factories, + rclcpp::Node::WeakPtr node + ) = 0; /** * @brief Creates HTML code for embedding a viewer for this streamer. @@ -146,7 +155,10 @@ class StreamerFactoryInterface * @param node The ROS2 node to use for discovering topics. * @return A vector of topic names. */ - virtual std::vector get_available_topics(rclcpp::Node & node); + virtual std::vector get_available_topics( + rclcpp::Node & node, + std::map> subscriber_factories + ); }; /** diff --git a/include/web_video_server/streamers/h264_streamer.hpp b/include/web_video_server/streamers/h264_streamer.hpp index e6cc088..88344ca 100644 --- a/include/web_video_server/streamers/h264_streamer.hpp +++ b/include/web_video_server/streamers/h264_streamer.hpp @@ -50,6 +50,7 @@ class H264Streamer : public LibavStreamerBase H264Streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node); ~H264Streamer(); @@ -65,6 +66,7 @@ class H264StreamerFactory : public LibavStreamerFactoryBase std::shared_ptr create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node); }; diff --git a/include/web_video_server/streamers/image_transport_streamer.hpp b/include/web_video_server/streamers/image_streamer.hpp similarity index 77% rename from include/web_video_server/streamers/image_transport_streamer.hpp rename to include/web_video_server/streamers/image_streamer.hpp index 141c6d7..15a3049 100644 --- a/include/web_video_server/streamers/image_transport_streamer.hpp +++ b/include/web_video_server/streamers/image_streamer.hpp @@ -31,7 +31,6 @@ #include #include -#include #include #include @@ -39,8 +38,6 @@ #include "async_web_server_cpp/http_connection.hpp" #include "async_web_server_cpp/http_request.hpp" -#include "image_transport/image_transport.hpp" -#include "image_transport/subscriber.hpp" #include "rclcpp/node.hpp" #include "sensor_msgs/msg/image.hpp" @@ -52,18 +49,19 @@ namespace streamers { /** - * @brief A common base class for all streaming plugins using image_transport to subscribe to image + * @brief A common base class for all streaming plugins using an image subscriber to get an image * topics. */ -class ImageTransportStreamerBase : public StreamerBase +class ImageStreamerBase : public StreamerBase { public: - ImageTransportStreamerBase( + ImageStreamerBase( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node, - std::string logger_name = "image_transport_streamer"); - virtual ~ImageTransportStreamerBase(); + std::string logger_name = "image_streamer"); + virtual ~ImageStreamerBase(); virtual void start(); virtual void restream_frame(std::chrono::duration max_age); @@ -75,16 +73,12 @@ class ImageTransportStreamerBase : public StreamerBase const std::chrono::steady_clock::time_point & time) = 0; virtual void initialize(const cv::Mat & img); - image_transport::Subscriber image_sub_; int output_width_; int output_height_; bool invert_; - std::string default_transport_; - std::string qos_profile_name_; std::chrono::steady_clock::time_point last_frame_; cv::Mat output_size_image_; - std::mutex send_mutex_; private: bool initialized_; @@ -95,16 +89,22 @@ class ImageTransportStreamerBase : public StreamerBase rclcpp::Node & node); }; -class ImageTransportStreamerFactoryBase : public StreamerFactoryInterface +class ImageStreamerFactoryBase : public StreamerFactoryInterface { public: - virtual std::vector get_available_topics(rclcpp::Node & node); + virtual std::vector get_available_topics( + rclcpp::Node & node, + std::map> subscriber_factories + ); }; -class ImageTransportSnapshotStreamerFactoryBase : public SnapshotStreamerFactoryInterface +class ImageSnapshotStreamerFactoryBase : public SnapshotStreamerFactoryInterface { public: - virtual std::vector get_available_topics(rclcpp::Node & node); + virtual std::vector get_available_topics( + rclcpp::Node & node, + std::map> subscriber_factories + ); }; } // namespace streamers diff --git a/include/web_video_server/streamers/jpeg_streamers.hpp b/include/web_video_server/streamers/jpeg_streamers.hpp index 5b9e1ba..0d8460d 100644 --- a/include/web_video_server/streamers/jpeg_streamers.hpp +++ b/include/web_video_server/streamers/jpeg_streamers.hpp @@ -42,19 +42,20 @@ #include "web_video_server/multipart_stream.hpp" #include "web_video_server/streamer.hpp" -#include "web_video_server/streamers/image_transport_streamer.hpp" +#include "web_video_server/streamers/image_streamer.hpp" namespace web_video_server { namespace streamers { -class MjpegStreamer : public ImageTransportStreamerBase +class MjpegStreamer : public ImageStreamerBase { public: MjpegStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node); ~MjpegStreamer(); @@ -66,22 +67,24 @@ class MjpegStreamer : public ImageTransportStreamerBase int quality_; }; -class MjpegStreamerFactory : public ImageTransportStreamerFactoryBase +class MjpegStreamerFactory : public ImageStreamerFactoryBase { public: std::string get_type() {return "mjpeg";} std::shared_ptr create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node); }; -class JpegSnapshotStreamer : public ImageTransportStreamerBase +class JpegSnapshotStreamer : public ImageStreamerBase { public: JpegSnapshotStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node); ~JpegSnapshotStreamer(); @@ -92,7 +95,7 @@ class JpegSnapshotStreamer : public ImageTransportStreamerBase int quality_; }; -class JpegSnapshotStreamerFactory : public ImageTransportSnapshotStreamerFactoryBase +class JpegSnapshotStreamerFactory : public ImageSnapshotStreamerFactoryBase { public: std::string get_type() {return "jpeg";} @@ -100,6 +103,7 @@ class JpegSnapshotStreamerFactory : public ImageTransportSnapshotStreamerFactory std::shared_ptr create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node); }; diff --git a/include/web_video_server/streamers/libav_streamer.hpp b/include/web_video_server/streamers/libav_streamer.hpp index 02b23e8..039958e 100644 --- a/include/web_video_server/streamers/libav_streamer.hpp +++ b/include/web_video_server/streamers/libav_streamer.hpp @@ -51,7 +51,7 @@ extern "C" #include "async_web_server_cpp/http_request.hpp" #include "rclcpp/node.hpp" -#include "web_video_server/streamers/image_transport_streamer.hpp" +#include "web_video_server/streamers/image_streamer.hpp" namespace web_video_server { @@ -59,21 +59,21 @@ namespace streamers { /** - * @brief A common base class for all streaming plugins using image_transport to subscribe to image + * @brief A common base class for all streaming plugins using an image subscriber to get image * topics and libav to encode and stream video. */ -class LibavStreamerBase : public ImageTransportStreamerBase +class LibavStreamerBase : public ImageStreamerBase { public: LibavStreamerBase( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node, std::string logger_name, const std::string & format_name, const std::string & codec_name, const std::string & content_type); - ~LibavStreamerBase(); protected: @@ -105,7 +105,7 @@ class LibavStreamerBase : public ImageTransportStreamerBase uint8_t * io_buffer_; // custom IO buffer }; -class LibavStreamerFactoryBase : public ImageTransportStreamerFactoryBase +class LibavStreamerFactoryBase : public ImageStreamerFactoryBase { public: virtual std::string create_viewer(const async_web_server_cpp::HttpRequest & request); diff --git a/include/web_video_server/streamers/png_streamers.hpp b/include/web_video_server/streamers/png_streamers.hpp index ef2cde4..f0bc19f 100644 --- a/include/web_video_server/streamers/png_streamers.hpp +++ b/include/web_video_server/streamers/png_streamers.hpp @@ -42,19 +42,20 @@ #include "web_video_server/multipart_stream.hpp" #include "web_video_server/streamer.hpp" -#include "web_video_server/streamers/image_transport_streamer.hpp" +#include "web_video_server/streamers/image_streamer.hpp" namespace web_video_server { namespace streamers { -class PngStreamer : public ImageTransportStreamerBase +class PngStreamer : public ImageStreamerBase { public: PngStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node); ~PngStreamer(); @@ -67,22 +68,24 @@ class PngStreamer : public ImageTransportStreamerBase int quality_; }; -class PngStreamerFactory : public ImageTransportStreamerFactoryBase +class PngStreamerFactory : public ImageStreamerFactoryBase { public: std::string get_type() {return "png";} std::shared_ptr create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node); }; -class PngSnapshotStreamer : public ImageTransportStreamerBase +class PngSnapshotStreamer : public ImageStreamerBase { public: PngSnapshotStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node); ~PngSnapshotStreamer(); @@ -94,7 +97,7 @@ class PngSnapshotStreamer : public ImageTransportStreamerBase int quality_; }; -class PngSnapshotStreamerFactory : public ImageTransportSnapshotStreamerFactoryBase +class PngSnapshotStreamerFactory : public ImageSnapshotStreamerFactoryBase { public: std::string get_type() {return "png";} @@ -102,6 +105,7 @@ class PngSnapshotStreamerFactory : public ImageTransportSnapshotStreamerFactoryB std::shared_ptr create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node); }; diff --git a/include/web_video_server/streamers/ros_compressed_streamer.hpp b/include/web_video_server/streamers/ros_compressed_streamer.hpp index 9b20b96..9fff80a 100644 --- a/include/web_video_server/streamers/ros_compressed_streamer.hpp +++ b/include/web_video_server/streamers/ros_compressed_streamer.hpp @@ -56,6 +56,7 @@ class RosCompressedStreamer : public StreamerBase RosCompressedStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node); ~RosCompressedStreamer(); virtual void start(); @@ -72,7 +73,6 @@ class RosCompressedStreamer : public StreamerBase rclcpp::Subscription::SharedPtr image_sub_; std::chrono::steady_clock::time_point last_frame_; sensor_msgs::msg::CompressedImage::ConstSharedPtr last_msg_; - std::mutex send_mutex_; std::string qos_profile_name_; }; @@ -83,8 +83,12 @@ class RosCompressedStreamerFactory : public StreamerFactoryInterface std::shared_ptr create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node); - std::vector get_available_topics(rclcpp::Node & node); + std::vector get_available_topics( + rclcpp::Node & node, + std::map> subscriber_factories + ); }; class RosCompressedSnapshotStreamer : public StreamerBase @@ -93,6 +97,7 @@ class RosCompressedSnapshotStreamer : public StreamerBase RosCompressedSnapshotStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node); ~RosCompressedSnapshotStreamer(); virtual void start(); @@ -117,8 +122,12 @@ class RosCompressedSnapshotStreamerFactory : public SnapshotStreamerFactoryInter std::shared_ptr create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node); - std::vector get_available_topics(rclcpp::Node & node); + std::vector get_available_topics( + rclcpp::Node & node, + std::map> subscriber_factories + ); }; } // namespace streamers diff --git a/include/web_video_server/streamers/vp8_streamer.hpp b/include/web_video_server/streamers/vp8_streamer.hpp index aeee06a..93461c0 100644 --- a/include/web_video_server/streamers/vp8_streamer.hpp +++ b/include/web_video_server/streamers/vp8_streamer.hpp @@ -51,6 +51,7 @@ class Vp8Streamer : public LibavStreamerBase Vp8Streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node); ~Vp8Streamer(); @@ -68,6 +69,7 @@ class Vp8StreamerFactory : public LibavStreamerFactoryBase std::shared_ptr create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node); }; diff --git a/include/web_video_server/streamers/vp9_streamer.hpp b/include/web_video_server/streamers/vp9_streamer.hpp index 97d5e73..e7c6c5f 100644 --- a/include/web_video_server/streamers/vp9_streamer.hpp +++ b/include/web_video_server/streamers/vp9_streamer.hpp @@ -50,6 +50,7 @@ class Vp9Streamer : public LibavStreamerBase Vp9Streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node); ~Vp9Streamer(); @@ -64,6 +65,7 @@ class Vp9StreamerFactory : public LibavStreamerFactoryBase std::shared_ptr create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node); }; diff --git a/include/web_video_server/subscriber.hpp b/include/web_video_server/subscriber.hpp new file mode 100644 index 0000000..55d609a --- /dev/null +++ b/include/web_video_server/subscriber.hpp @@ -0,0 +1,137 @@ +// Copyright (c) 2014, Worcester Polytechnic Institute +// Copyright (c) 2024-2025, The Robot Web Tools Contributors +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include +#include +#include +#include +#include + +#include "async_web_server_cpp/http_connection.hpp" +#include "async_web_server_cpp/http_request.hpp" + +#include "rclcpp/logger.hpp" +#include "rclcpp/node.hpp" + +#include "sensor_msgs/msg/image.hpp" + +namespace web_video_server +{ +typedef std::function ImageCallback; + +class SubscriberInterface +{ +public: + virtual ~SubscriberInterface() {} + + /** + * @brief Starts the streaming process + */ + virtual void subscribe( + const async_web_server_cpp::HttpRequest & request, + const std::string & topic, + const ImageCallback & callback) = 0; + +protected: + virtual void subscriber_callback(const sensor_msgs::msg::Image::ConstSharedPtr & input_msg) = 0; +}; + +/** + * @brief A base class providing common functionality for Subscribers. + */ +class SubscriberBase : public SubscriberInterface +{ +public: + explicit SubscriberBase( + rclcpp::Node::SharedPtr node, + std::string logger_name = "subscriber"); + + std::mutex subscriber_mutex; + + void subscribe( + const async_web_server_cpp::HttpRequest & request, + const std::string & topic, + const ImageCallback & callback) override; + + void try_forward_image(const sensor_msgs::msg::Image::ConstSharedPtr & input_msg) + { + try { + callback_(input_msg); + } catch (...) { + RCLCPP_ERROR(logger_, "The subscriber plugin failed send image for some reason."); + } + } + +protected: + void subscriber_callback(const sensor_msgs::msg::Image::ConstSharedPtr & input_msg) override; + + rclcpp::Node::SharedPtr node_; + rclcpp::Logger logger_; + bool inactive_; + + ImageCallback callback_; + rclcpp::Subscription::SharedPtr sub_; +}; + +/** + * @brief A factory interface for creating Subscriber instances. + */ +class SubscriberFactoryInterface +{ +public: + virtual ~SubscriberFactoryInterface() = default; + + /** + * @brief Returns the type of Subscriber created by this factory. + * + * This should match the "type" query parameter used to select the Subscriber. + */ + virtual std::string get_type() = 0; + + /** + * @brief Creates a new Subscriber instance. + * @param request The HTTP request that initiated the Subscriber. + * @param node The ROS2 node to use for subscribing to topics. + * @return A shared pointer to the created Subscriber instance. + */ + virtual std::shared_ptr create_subscriber( + rclcpp::Node::SharedPtr node) = 0; + + /** + * @brief Returns a list of available topics that can be streamed by this subscriber. + * @param node The ROS2 node to use for discovering topics. + * @return A vector of topic names. + */ + virtual std::vector get_available_topics(rclcpp::Node & node); +}; + +} // end namespace web_video_server diff --git a/include/web_video_server/subscribers/image_transport_subscriber.hpp b/include/web_video_server/subscribers/image_transport_subscriber.hpp new file mode 100644 index 0000000..e8ff8a0 --- /dev/null +++ b/include/web_video_server/subscribers/image_transport_subscriber.hpp @@ -0,0 +1,76 @@ +// Copyright (c) 2024-2026, The Robot Web Tools Contributors +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include "async_web_server_cpp/http_request.hpp" + +#include "rclcpp/node.hpp" + +#include "image_transport/image_transport.hpp" +#include "image_transport/subscriber.hpp" + +#include "web_video_server/subscriber.hpp" + +namespace web_video_server +{ +namespace subscribers +{ + +class ImageTransportSubscriber : public SubscriberBase +{ +public: + explicit ImageTransportSubscriber(rclcpp::Node::SharedPtr node); + + ~ImageTransportSubscriber(); + + void subscribe( + const async_web_server_cpp::HttpRequest & request, + const std::string & topic, + const ImageCallback & callback); + +private: + void subscriber_callback(const sensor_msgs::msg::Image::ConstSharedPtr & input_msg); + + image_transport::Subscriber sub_; +}; + +class ImageTransportSubscriberFactory : public SubscriberFactoryInterface +{ +public: + std::string get_type() {return "sensor_msgs/msg/Image";} + + std::shared_ptr create_subscriber( + rclcpp::Node::SharedPtr node); + + std::vector get_available_topics(rclcpp::Node & node); +}; + +} // namespace subscribers +} // namespace web_video_server diff --git a/include/web_video_server/web_video_server.hpp b/include/web_video_server/web_video_server.hpp index c7e4c4a..84e3fc9 100644 --- a/include/web_video_server/web_video_server.hpp +++ b/include/web_video_server/web_video_server.hpp @@ -118,6 +118,9 @@ class WebVideoServer : public rclcpp::Node pluginlib::ClassLoader snapshot_streamer_factory_loader_; std::map> snapshot_streamer_factories_; std::mutex streamers_mutex_; + + pluginlib::ClassLoader subscriber_factory_loader_; + std::map> subscriber_factories_; }; } // namespace web_video_server diff --git a/src/streamer.cpp b/src/streamer.cpp index b7026eb..3e8004b 100644 --- a/src/streamer.cpp +++ b/src/streamer.cpp @@ -29,11 +29,13 @@ // POSSIBILITY OF SUCH DAMAGE. #include "web_video_server/streamer.hpp" +#include "web_video_server/subscriber.hpp" #include #include #include -#include +#include +#include #include "rclcpp/node.hpp" #include "rclcpp/logging.hpp" @@ -47,11 +49,16 @@ namespace web_video_server StreamerBase::StreamerBase( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node, std::string logger_name) -: connection_(connection), request_(request), node_(std::move(node)), - logger_(node_.lock()->get_logger().get_child(logger_name)), inactive_(false), - topic_(request.get_query_param_value_or_default("topic", "")) +: connection_(connection), + request_(request), + node_(node), + logger_(node_.lock()->get_logger().get_child(logger_name)), + inactive_(false), + topic_(request.get_query_param_value_or_default("topic", "")), + subscriber_factories_(subscriber_factories) { } @@ -75,7 +82,8 @@ std::string StreamerFactoryInterface::create_viewer( } std::vector StreamerFactoryInterface::get_available_topics( - rclcpp::Node & /* node */) + rclcpp::Node & /*node*/, + std::map>/*subscriber_factories*/) { return {}; } diff --git a/src/streamers/h264_streamer.cpp b/src/streamers/h264_streamer.cpp index a96d872..596449b 100644 --- a/src/streamers/h264_streamer.cpp +++ b/src/streamers/h264_streamer.cpp @@ -37,8 +37,10 @@ extern "C" #include } +#include #include #include +#include #include "async_web_server_cpp/http_connection.hpp" #include "async_web_server_cpp/http_request.hpp" @@ -46,6 +48,7 @@ extern "C" #include "web_video_server/streamer.hpp" #include "web_video_server/streamers/libav_streamer.hpp" +#include "web_video_server/subscriber.hpp" namespace web_video_server { @@ -54,8 +57,11 @@ namespace streamers H264Streamer::H264Streamer( const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::WeakPtr node) -: LibavStreamerBase(request, connection, node, "h264_streamer", "mp4", "libx264", "video/mp4") + async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, + rclcpp::Node::WeakPtr node) +: LibavStreamerBase(request, connection, subscriber_factories, node, + "h264_streamer", "mp4", "libx264", "video/mp4") { /* possible quality presets: * ultrafast, superfast, veryfast, faster, fast, medium, slow, slower, veryslow, placebo @@ -87,9 +93,10 @@ void H264Streamer::initialize_encoder() std::shared_ptr H264StreamerFactory::create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node) { - return std::make_shared(request, connection, node); + return std::make_shared(request, connection, subscriber_factories, node); } } // namespace streamers diff --git a/src/streamers/image_transport_streamer.cpp b/src/streamers/image_streamer.cpp similarity index 67% rename from src/streamers/image_transport_streamer.cpp rename to src/streamers/image_streamer.cpp index 5369276..f472bbd 100644 --- a/src/streamers/image_transport_streamer.cpp +++ b/src/streamers/image_streamer.cpp @@ -28,7 +28,7 @@ // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE // POSSIBILITY OF SUCH DAMAGE. -#include "web_video_server/streamers/image_transport_streamer.hpp" +#include "web_video_server/streamers/image_streamer.hpp" #include #include @@ -36,6 +36,8 @@ #include #include #include +#include +#include #include #include @@ -51,66 +53,37 @@ #include "async_web_server_cpp/http_connection.hpp" #include "async_web_server_cpp/http_request.hpp" -#include "image_transport/image_transport.hpp" -#include "image_transport/transport_hints.hpp" #include "rclcpp/node.hpp" #include "rclcpp/logging.hpp" -#include "rmw/qos_profiles.h" #include "sensor_msgs/msg/image.hpp" #include "web_video_server/streamer.hpp" -#include "web_video_server/utils.hpp" +#include "web_video_server/subscriber.hpp" namespace web_video_server { namespace streamers { -namespace -{ - -std::vector get_image_topics(rclcpp::Node & node) -{ - std::vector result; - auto topic_names_and_types = node.get_topic_names_and_types(); - for (const auto & topic_and_types : topic_names_and_types) { - for (const auto & type : topic_and_types.second) { - if (type == "sensor_msgs/msg/Image") { - result.push_back(topic_and_types.first); - break; - } - } - } - return result; -} - -} // namespace - -ImageTransportStreamerBase::ImageTransportStreamerBase( +ImageStreamerBase::ImageStreamerBase( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node, std::string logger_name) -: StreamerBase(request, connection, node, logger_name), initialized_(false) +: StreamerBase(request, connection, subscriber_factories, node, logger_name), + initialized_(false) { output_width_ = request.get_query_param_value_or_default("width", -1); output_height_ = request.get_query_param_value_or_default("height", -1); invert_ = request.has_query_param("invert"); - default_transport_ = request.get_query_param_value_or_default("default_transport", "raw"); - qos_profile_name_ = request.get_query_param_value_or_default("qos_profile", "default"); } -ImageTransportStreamerBase::~ImageTransportStreamerBase() +ImageStreamerBase::~ImageStreamerBase() { } -// We disable deprecation warnings for image_transport API usage -// to maintain compatibility with older ROS 2 distributions. -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wdeprecated-declarations" -// NOLINTBEGIN(clang-diagnostic-deprecated-declarations) - -void ImageTransportStreamerBase::start() +void ImageStreamerBase::start() { auto node = lock_node(); if (!node) { @@ -118,7 +91,6 @@ void ImageTransportStreamerBase::start() return; } - const image_transport::TransportHints hints(node.get(), default_transport_); auto tnat = node->get_topic_names_and_types(); inactive_ = true; for (auto topic_and_types : tnat) { @@ -127,40 +99,25 @@ void ImageTransportStreamerBase::start() continue; } const auto & topic_name = topic_and_types.first; + const auto & topic_type = topic_and_types.second[0]; if (topic_name == topic_ || (topic_name.find("/") == 0 && topic_name.substr(1) == topic_)) { inactive_ = false; + + subscriber_ = subscriber_factories_[topic_type]->create_subscriber(node); + subscriber_->subscribe( + request_, topic_, + std::bind(&ImageStreamerBase::image_callback, this, std::placeholders::_1)); + break; } } - - // Get QoS profile from query parameter - RCLCPP_INFO( - logger_, "Streaming topic %s with QoS profile %s", topic_.c_str(), - qos_profile_name_.c_str()); - auto qos_profile = get_qos_profile_from_name(qos_profile_name_); - if (!qos_profile) { - qos_profile = rmw_qos_profile_default; - RCLCPP_ERROR( - logger_, - "Invalid QoS profile %s specified. Using default profile.", - qos_profile_name_.c_str()); - } - - // Create subscriber - image_sub_ = image_transport::create_subscription( - node.get(), topic_, - std::bind(&ImageTransportStreamerBase::image_callback, this, std::placeholders::_1), - default_transport_, qos_profile.value()); } -#pragma GCC diagnostic pop -// NOLINTEND(clang-diagnostic-deprecated-declarations) - -void ImageTransportStreamerBase::initialize(const cv::Mat & /*img*/) +void ImageStreamerBase::initialize(const cv::Mat & /*img*/) { } -void ImageTransportStreamerBase::restream_frame(std::chrono::duration/* max_age */) +void ImageStreamerBase::restream_frame(std::chrono::duration/* max_age */) { if (inactive_ || !initialized_) { return; @@ -175,7 +132,7 @@ void ImageTransportStreamerBase::restream_frame(std::chrono::duration/* try_send_image(output_size_image_, last_frame_, *node); } -void ImageTransportStreamerBase::image_callback(const sensor_msgs::msg::Image::ConstSharedPtr & msg) +void ImageStreamerBase::image_callback(const sensor_msgs::msg::Image::ConstSharedPtr & msg) { if (inactive_) { return; @@ -206,7 +163,7 @@ void ImageTransportStreamerBase::image_callback(const sensor_msgs::msg::Image::C cv::flip(img, img, 1); } - const std::scoped_lock lock(send_mutex_); // protects output_size_image_ + const std::scoped_lock lock(send_mutex); // protects output_size_image_ if (output_width_ != input_width || output_height_ != input_height) { cv::Mat img_resized; const cv::Size new_size(output_width_, output_height_); @@ -237,13 +194,13 @@ void ImageTransportStreamerBase::image_callback(const sensor_msgs::msg::Image::C try_send_image(output_size_image_, last_frame_, *node); } -void ImageTransportStreamerBase::try_send_image( +void ImageStreamerBase::try_send_image( const cv::Mat & img, const std::chrono::steady_clock::time_point & /* time */, rclcpp::Node & node) { try { - const std::scoped_lock lock(send_mutex_); + const std::scoped_lock lock(send_mutex); send_image(img, std::chrono::steady_clock::now()); } catch (boost::system::system_error & e) { // happens when client disconnects @@ -263,7 +220,7 @@ void ImageTransportStreamerBase::try_send_image( } } -cv::Mat ImageTransportStreamerBase::decode_image( +cv::Mat ImageStreamerBase::decode_image( const sensor_msgs::msg::Image::ConstSharedPtr & msg) { if (msg->encoding.find("F") != std::string::npos) { @@ -282,16 +239,32 @@ cv::Mat ImageTransportStreamerBase::decode_image( return cv_bridge::toCvCopy(msg, "bgr8")->image; } -std::vector ImageTransportStreamerFactoryBase::get_available_topics( - rclcpp::Node & node) +std::vector ImageStreamerFactoryBase::get_available_topics( + rclcpp::Node & node, + std::map> subscriber_factories) { - return get_image_topics(node); + std::vector results; + + for (auto subscriber: subscriber_factories) { + std::vector entries = subscriber.second->get_available_topics(node); + results.insert(results.end(), entries.begin(), entries.end()); + } + + return results; } -std::vector ImageTransportSnapshotStreamerFactoryBase::get_available_topics( - rclcpp::Node & node) +std::vector ImageSnapshotStreamerFactoryBase::get_available_topics( + rclcpp::Node & node, + std::map> subscriber_factories) { - return get_image_topics(node); + std::vector results; + + for (auto subscriber: subscriber_factories) { + std::vector entries = subscriber.second->get_available_topics(node); + results.insert(results.end(), entries.begin(), entries.end()); + } + + return results; } } // namespace streamers diff --git a/src/streamers/jpeg_streamers.cpp b/src/streamers/jpeg_streamers.cpp index c0abf0f..4b93d52 100644 --- a/src/streamers/jpeg_streamers.cpp +++ b/src/streamers/jpeg_streamers.cpp @@ -39,6 +39,7 @@ #include #include #include +#include #include #include @@ -49,7 +50,8 @@ #include "rclcpp/node.hpp" #include "web_video_server/streamer.hpp" -#include "web_video_server/streamers/image_transport_streamer.hpp" +#include "web_video_server/streamers/image_streamer.hpp" +#include "web_video_server/subscriber.hpp" namespace web_video_server { @@ -58,8 +60,10 @@ namespace streamers MjpegStreamer::MjpegStreamer( const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::WeakPtr node) -: ImageTransportStreamerBase(request, connection, node, "mjpeg_streamer"), + async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, + rclcpp::Node::WeakPtr node) +: ImageStreamerBase(request, connection, subscriber_factories, node, "mjpeg_streamer"), stream_(connection) { quality_ = request.get_query_param_value_or_default("quality", 95); @@ -69,7 +73,7 @@ MjpegStreamer::MjpegStreamer( MjpegStreamer::~MjpegStreamer() { this->inactive_ = true; - const std::scoped_lock lock(send_mutex_); // protects send_image. + const std::scoped_lock lock(send_mutex); // protects send_image. } void MjpegStreamer::send_image( @@ -89,16 +93,18 @@ void MjpegStreamer::send_image( std::shared_ptr MjpegStreamerFactory::create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node) { - return std::make_shared(request, connection, node); + return std::make_shared(request, connection, subscriber_factories, node); } JpegSnapshotStreamer::JpegSnapshotStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node) -: ImageTransportStreamerBase(request, connection, node, "jpeg_snapshot_streamer") +: ImageStreamerBase(request, connection, subscriber_factories, node, "jpeg_snapshot_streamer") { quality_ = request.get_query_param_value_or_default("quality", 95); } @@ -106,7 +112,7 @@ JpegSnapshotStreamer::JpegSnapshotStreamer( JpegSnapshotStreamer::~JpegSnapshotStreamer() { this->inactive_ = true; - const std::scoped_lock lock(send_mutex_); // protects send_image. + const std::scoped_lock lock(send_mutex); // protects send_image. } void JpegSnapshotStreamer::send_image( @@ -143,9 +149,12 @@ void JpegSnapshotStreamer::send_image( std::shared_ptr JpegSnapshotStreamerFactory::create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node) { - return std::make_shared(request, connection, std::move(node)); + return std::make_shared( + request, connection, subscriber_factories, + std::move(node)); } } // namespace streamers diff --git a/src/streamers/libav_streamer.cpp b/src/streamers/libav_streamer.cpp index 06e0a45..ee26293 100644 --- a/src/streamers/libav_streamer.cpp +++ b/src/streamers/libav_streamer.cpp @@ -56,6 +56,8 @@ extern "C" #include #include #include +#include +#include #include @@ -65,7 +67,8 @@ extern "C" #include "rclcpp/node.hpp" #include "rclcpp/logging.hpp" -#include "web_video_server/streamers/image_transport_streamer.hpp" +#include "web_video_server/streamers/image_streamer.hpp" +#include "web_video_server/subscriber.hpp" // https://stackoverflow.com/questions/46884682/error-in-building-opencv-with-ffmpeg #define AV_CODEC_FLAG_GLOBAL_HEADER (1 << 22) @@ -78,13 +81,15 @@ namespace streamers LibavStreamerBase::LibavStreamerBase( const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::WeakPtr node, + async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, + rclcpp::Node::WeakPtr node, std::string logger_name, const std::string & format_name, const std::string & codec_name, const std::string & content_type) -: ImageTransportStreamerBase(request, connection, node, logger_name), format_context_(0), codec_(0), - codec_context_(0), video_stream_(0), opt_(0), frame_(0), sws_context_(0), - first_image_received_(false), format_name_(format_name), codec_name_(codec_name), - content_type_(content_type), io_buffer_(0) +: ImageStreamerBase(request, connection, subscriber_factories, node, logger_name), + format_context_(0), codec_(0), codec_context_(0), video_stream_(0), opt_(0), + frame_(0), sws_context_(0), first_image_received_(false), format_name_(format_name), + codec_name_(codec_name), content_type_(content_type), io_buffer_(0) { bitrate_ = request.get_query_param_value_or_default("bitrate", 100000); qmin_ = request.get_query_param_value_or_default("qmin", 10); diff --git a/src/streamers/png_streamers.cpp b/src/streamers/png_streamers.cpp index 3a0b832..dd232a2 100644 --- a/src/streamers/png_streamers.cpp +++ b/src/streamers/png_streamers.cpp @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -49,7 +50,8 @@ #include "sensor_msgs/msg/image.hpp" #include "web_video_server/streamer.hpp" -#include "web_video_server/streamers/image_transport_streamer.hpp" +#include "web_video_server/streamers/image_streamer.hpp" +#include "web_video_server/subscriber.hpp" #ifdef CV_BRIDGE_USES_OLD_HEADERS #include "cv_bridge/cv_bridge.h" @@ -64,8 +66,11 @@ namespace streamers PngStreamer::PngStreamer( const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::WeakPtr node) -: ImageTransportStreamerBase(request, connection, node, "png_streamer"), stream_(connection) + async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, + rclcpp::Node::WeakPtr node) +: ImageStreamerBase(request, connection, subscriber_factories, node, "png_streamer"), + stream_(connection) { quality_ = request.get_query_param_value_or_default("quality", 3); stream_.send_initial_header(); @@ -74,7 +79,7 @@ PngStreamer::PngStreamer( PngStreamer::~PngStreamer() { this->inactive_ = true; - const std::scoped_lock lock(send_mutex_); // protects send_image. + const std::scoped_lock lock(send_mutex); // protects send_image. } cv::Mat PngStreamer::decode_image(const sensor_msgs::msg::Image::ConstSharedPtr & msg) @@ -84,7 +89,7 @@ cv::Mat PngStreamer::decode_image(const sensor_msgs::msg::Image::ConstSharedPtr return cv_bridge::toCvCopy(msg, "bgra8")->image; } // Use the normal decode otherwise - return ImageTransportStreamerBase::decode_image(msg); + return ImageStreamerBase::decode_image(msg); } void PngStreamer::send_image( @@ -104,16 +109,18 @@ void PngStreamer::send_image( std::shared_ptr PngStreamerFactory::create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node) { - return std::make_shared(request, connection, node); + return std::make_shared(request, connection, subscriber_factories, node); } PngSnapshotStreamer::PngSnapshotStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node) -: ImageTransportStreamerBase(request, connection, node) +: ImageStreamerBase(request, connection, subscriber_factories, node) { quality_ = request.get_query_param_value_or_default("quality", 3); } @@ -121,7 +128,7 @@ PngSnapshotStreamer::PngSnapshotStreamer( PngSnapshotStreamer::~PngSnapshotStreamer() { this->inactive_ = true; - const std::scoped_lock lock(send_mutex_); // protects send_image. + const std::scoped_lock lock(send_mutex); // protects send_image. } cv::Mat PngSnapshotStreamer::decode_image(const sensor_msgs::msg::Image::ConstSharedPtr & msg) @@ -131,7 +138,7 @@ cv::Mat PngSnapshotStreamer::decode_image(const sensor_msgs::msg::Image::ConstSh return cv_bridge::toCvCopy(msg, "bgra8")->image; } // Use the normal decode otherwise - return ImageTransportStreamerBase::decode_image(msg); + return ImageStreamerBase::decode_image(msg); } void PngSnapshotStreamer::send_image( @@ -168,9 +175,10 @@ void PngSnapshotStreamer::send_image( std::shared_ptr PngSnapshotStreamerFactory::create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node) { - return std::make_shared(request, connection, node); + return std::make_shared(request, connection, subscriber_factories, node); } } // namespace streamers diff --git a/src/streamers/ros_compressed_streamer.cpp b/src/streamers/ros_compressed_streamer.cpp index 2e0ca14..5aa7d1b 100644 --- a/src/streamers/ros_compressed_streamer.cpp +++ b/src/streamers/ros_compressed_streamer.cpp @@ -41,6 +41,7 @@ #include #include #include +#include #include #include @@ -59,6 +60,7 @@ #include "web_video_server/streamer.hpp" #include "web_video_server/streamers/jpeg_streamers.hpp" #include "web_video_server/utils.hpp" +#include "web_video_server/subscriber.hpp" namespace web_video_server { @@ -164,17 +166,29 @@ std::vector collect_compressed_topics(rclcpp::Node & node) RosCompressedStreamer::RosCompressedStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node) -: StreamerBase(request, connection, node, "ros_compressed_streamer"), stream_(connection) +: StreamerBase(request, connection, subscriber_factories, node, "ros_compressed_streamer"), + stream_(connection) { stream_.send_initial_header(); - qos_profile_name_ = request.get_query_param_value_or_default("qos_profile", "default"); + + auto node_locked = lock_node(); + if (!node_locked) { + return; + } + + const std::string default_qos_profile = + node_locked->get_parameter("default_qos_profile").as_string(); + auto qos_profile_name = request.get_query_param_value_or_default( + "qos_profile", + default_qos_profile); } RosCompressedStreamer::~RosCompressedStreamer() { this->inactive_ = true; - const std::scoped_lock lock(send_mutex_); // protects send_image. + const std::scoped_lock lock(send_mutex); // protects send_image. } void RosCompressedStreamer::start() @@ -197,7 +211,7 @@ void RosCompressedStreamer::restream_frame(std::chrono::duration max_age } if (last_frame_ + max_age < std::chrono::steady_clock::now()) { - const std::scoped_lock lock(send_mutex_); + const std::scoped_lock lock(send_mutex); // don't update last_frame, it may remain an old value. send_image(last_msg_, std::chrono::steady_clock::now()); } @@ -242,7 +256,7 @@ void RosCompressedStreamer::send_image( void RosCompressedStreamer::image_callback( const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg) { - const std::scoped_lock lock(send_mutex_); // protects last_msg_ and last_frame_ + const std::scoped_lock lock(send_mutex); // protects last_msg_ and last_frame_ last_msg_ = msg; last_frame_ = std::chrono::steady_clock::now(); send_image(last_msg_, last_frame_); @@ -252,6 +266,7 @@ void RosCompressedStreamer::image_callback( std::shared_ptr RosCompressedStreamerFactory::create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node) { auto node_locked = node.lock(); @@ -267,22 +282,25 @@ std::shared_ptr RosCompressedStreamerFactory::create_streamer RCLCPP_WARN( node_locked->get_logger().get_child("RosCompressedStreamerFactory"), "Could not find compressed image topic for %s, falling back to mjpeg", topic.c_str()); - return std::make_shared(request, connection, node); + return std::make_shared(request, connection, subscriber_factories, node); } - return std::make_shared(request, connection, node); + return std::make_shared(request, connection, subscriber_factories, node); } std::vector RosCompressedStreamerFactory::get_available_topics( - rclcpp::Node & node) + rclcpp::Node & node, + std::map>/*subscriber_factories*/) { return collect_compressed_topics(node); } RosCompressedSnapshotStreamer::RosCompressedSnapshotStreamer( const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::WeakPtr node) -: StreamerBase(request, connection, node, "ros_compressed_snapshot_streamer") + async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, + rclcpp::Node::WeakPtr node) +: StreamerBase(request, connection, subscriber_factories, node, "ros_compressed_snapshot_streamer") { qos_profile_name_ = request.get_query_param_value_or_default("qos_profile", "default"); } @@ -376,6 +394,7 @@ std::shared_ptr RosCompressedSnapshotStreamerFactory::create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node) { auto node_locked = node.lock(); @@ -391,13 +410,15 @@ RosCompressedSnapshotStreamerFactory::create_streamer( RCLCPP_WARN( node_locked->get_logger().get_child("RosCompressedSnapshotStreamerFactory"), "Could not find compressed image topic for %s, falling back to jpeg", topic.c_str()); - return std::make_shared(request, connection, node); + return std::make_shared(request, connection, subscriber_factories, node); } - return std::make_shared(request, connection, node); + return std::make_shared( + request, connection, subscriber_factories, node); } std::vector RosCompressedSnapshotStreamerFactory::get_available_topics( - rclcpp::Node & node) + rclcpp::Node & node, + std::map>/*subscriber_factories*/) { return collect_compressed_topics(node); } diff --git a/src/streamers/vp8_streamer.cpp b/src/streamers/vp8_streamer.cpp index 82f0df5..09d2f94 100644 --- a/src/streamers/vp8_streamer.cpp +++ b/src/streamers/vp8_streamer.cpp @@ -36,6 +36,7 @@ extern "C" #include } +#include #include #include #include @@ -46,6 +47,7 @@ extern "C" #include "web_video_server/streamer.hpp" #include "web_video_server/streamers/libav_streamer.hpp" +#include "web_video_server/subscriber.hpp" namespace web_video_server { @@ -54,8 +56,11 @@ namespace streamers Vp8Streamer::Vp8Streamer( const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::WeakPtr node) -: LibavStreamerBase(request, connection, node, "vp8_streamer", "webm", "libvpx", "video/webm") + async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, + rclcpp::Node::WeakPtr node) +: LibavStreamerBase(request, connection, subscriber_factories, node, + "vp8_streamer", "webm", "libvpx", "video/webm") { quality_ = request.get_query_param_value_or_default("quality", "realtime"); } @@ -92,9 +97,10 @@ void Vp8Streamer::initialize_encoder() std::shared_ptr Vp8StreamerFactory::create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node) { - return std::make_shared(request, connection, node); + return std::make_shared(request, connection, subscriber_factories, node); } } // namespace streamers diff --git a/src/streamers/vp9_streamer.cpp b/src/streamers/vp9_streamer.cpp index f2b1187..b29d639 100644 --- a/src/streamers/vp9_streamer.cpp +++ b/src/streamers/vp9_streamer.cpp @@ -35,8 +35,10 @@ extern "C" #include } +#include #include #include +#include #include "async_web_server_cpp/http_connection.hpp" #include "async_web_server_cpp/http_request.hpp" @@ -44,6 +46,7 @@ extern "C" #include "web_video_server/streamer.hpp" #include "web_video_server/streamers/libav_streamer.hpp" +#include "web_video_server/subscriber.hpp" namespace web_video_server { @@ -52,8 +55,11 @@ namespace streamers Vp9Streamer::Vp9Streamer( const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::WeakPtr node) -: LibavStreamerBase(request, connection, node, "vp9_streamer", "webm", "libvpx-vp9", "video/webm") + async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, + rclcpp::Node::WeakPtr node) +: LibavStreamerBase(request, connection, subscriber_factories, node, + "vp9_streamer", "webm", "libvpx-vp9", "video/webm") { } Vp9Streamer::~Vp9Streamer() @@ -73,9 +79,10 @@ void Vp9Streamer::initialize_encoder() std::shared_ptr Vp9StreamerFactory::create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, + std::map> & subscriber_factories, rclcpp::Node::WeakPtr node) { - return std::make_shared(request, connection, node); + return std::make_shared(request, connection, subscriber_factories, node); } } // namespace streamers diff --git a/src/subscriber.cpp b/src/subscriber.cpp new file mode 100644 index 0000000..51b2223 --- /dev/null +++ b/src/subscriber.cpp @@ -0,0 +1,106 @@ +// Copyright (c) 2014, Worcester Polytechnic Institute +// Copyright (c) 2024-2025, The Robot Web Tools Contributors +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +#include "web_video_server/subscriber.hpp" + +#include +#include +#include +#include + +#include "rclcpp/node.hpp" +#include "rclcpp/logging.hpp" +#include "rclcpp/qos.hpp" +#include "rmw/qos_profiles.h" +#include "sensor_msgs/msg/image.hpp" + +#include "async_web_server_cpp/http_request.hpp" +#include "web_video_server/utils.hpp" +namespace web_video_server +{ + +SubscriberBase::SubscriberBase( + rclcpp::Node::SharedPtr node, + std::string logger_name) +: node_(node), + logger_(node->get_logger().get_child(logger_name)), + inactive_(false) +{ +} + +void SubscriberBase::subscribe( + const async_web_server_cpp::HttpRequest & request, + const std::string & topic, + const ImageCallback & callback) +{ + const std::scoped_lock lock(subscriber_mutex); + + callback_ = callback; + const std::string default_qos_profile = node_->get_parameter("default_qos_profile").as_string(); + auto qos_profile_name = request.get_query_param_value_or_default( + "qos_profile", + default_qos_profile); + + // Get QoS profile from query parameter + RCLCPP_INFO( + logger_, "Streaming topic %s with QoS profile %s", topic.c_str(), + qos_profile_name.c_str()); + auto qos_profile = get_qos_profile_from_name(qos_profile_name); + if (!qos_profile) { + qos_profile = rmw_qos_profile_default; + RCLCPP_ERROR( + logger_, "Invalid QoS profile %s specified. Using default profile.", + qos_profile_name.c_str()); + } + + const rclcpp::QoS qos = rclcpp::QoS( + rclcpp::QoSInitialization(qos_profile.value().history, 1), + qos_profile.value()); + + // Create subscriber + sub_ = node_->create_subscription( + topic, qos, + std::bind(&SubscriberBase::subscriber_callback, this, std::placeholders::_1)); +} + +void SubscriberBase::subscriber_callback(const sensor_msgs::msg::Image::ConstSharedPtr & input_msg) +{ + const std::scoped_lock lock(subscriber_mutex); + + try_forward_image(input_msg); +} + +std::vector SubscriberFactoryInterface::get_available_topics( + rclcpp::Node & /* node */) +{ + return {}; +} + +} // namespace web_video_server diff --git a/src/subscribers/image_transport_subscriber.cpp b/src/subscribers/image_transport_subscriber.cpp new file mode 100644 index 0000000..bce1f28 --- /dev/null +++ b/src/subscribers/image_transport_subscriber.cpp @@ -0,0 +1,153 @@ +// Copyright (c) 2024-2025, The Robot Web Tools Contributors +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +#include +#include +#include +#include + +#include "rclcpp/node.hpp" +#include "rclcpp/logging.hpp" +#include "rmw/qos_profiles.h" +#include "sensor_msgs/msg/image.hpp" +#include "image_transport/image_transport.hpp" + +#include "async_web_server_cpp/http_request.hpp" +#include "web_video_server/subscribers/image_transport_subscriber.hpp" +#include "web_video_server/utils.hpp" +#include "web_video_server/subscriber.hpp" + +namespace web_video_server +{ +namespace subscribers +{ +ImageTransportSubscriber::ImageTransportSubscriber(rclcpp::Node::SharedPtr node) +: SubscriberBase(node, "image_transport_subscriber") +{ + const std::scoped_lock lock(subscriber_mutex); + + if (!node_->has_parameter("default_transport")) { + node_->declare_parameter("default_transport", "raw"); + } +} + +ImageTransportSubscriber::~ImageTransportSubscriber() +{ + const std::scoped_lock lock(subscriber_mutex); + inactive_ = true; +} + +// We disable deprecation warnings for image_transport API usage +// to maintain compatibility with older ROS 2 distributions. +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +// NOLINTBEGIN(clang-diagnostic-deprecated-declarations) + +void ImageTransportSubscriber::subscribe( + const async_web_server_cpp::HttpRequest & request, + const std::string & topic, + const ImageCallback & callback) +{ + const std::scoped_lock lock(subscriber_mutex); + + callback_ = callback; + const std::string default_transport = node_->get_parameter("default_transport").as_string(); + const std::string transport = request.get_query_param_value_or_default( + "default_transport", + default_transport); + + const std::string default_qos_profile = node_->get_parameter("default_qos_profile").as_string(); + auto qos_profile_name = request.get_query_param_value_or_default( + "qos_profile", + default_qos_profile); + + // Get QoS profile from query parameter + RCLCPP_INFO( + logger_, "Streaming topic %s with QoS profile %s", topic.c_str(), + qos_profile_name.c_str()); + auto qos_profile = get_qos_profile_from_name(qos_profile_name); + if (!qos_profile) { + qos_profile = rmw_qos_profile_default; + RCLCPP_ERROR( + logger_, + "Invalid QoS profile %s specified. Using default profile.", + qos_profile_name.c_str()); + } + + const auto qos = qos_profile.value(); + + sub_ = image_transport::create_subscription( + node_.get(), topic, + std::bind(&ImageTransportSubscriber::subscriber_callback, this, std::placeholders::_1), + transport, qos); +} + +#pragma GCC diagnostic pop +// NOLINTEND(clang-diagnostic-deprecated-declarations) + +void ImageTransportSubscriber::subscriber_callback( + const sensor_msgs::msg::Image::ConstSharedPtr & input_msg) +{ + const std::scoped_lock lock(subscriber_mutex); + + if (inactive_) {return;} + + try_forward_image(input_msg); +} + +std::shared_ptr ImageTransportSubscriberFactory::create_subscriber( + rclcpp::Node::SharedPtr node) +{ + return std::make_shared(node); +} + +std::vector ImageTransportSubscriberFactory::get_available_topics( + rclcpp::Node & node) +{ + std::vector result; + auto topic_names_and_types = node.get_topic_names_and_types(); + for (const auto & topic_and_types : topic_names_and_types) { + for (const auto & type : topic_and_types.second) { + if (type == this->get_type()) { + result.push_back(topic_and_types.first); + break; + } + } + } + return result; +} + +} // namespace subscribers +} // namespace web_video_server + +#include "pluginlib/class_list_macros.hpp" + +PLUGINLIB_EXPORT_CLASS( + web_video_server::subscribers::ImageTransportSubscriberFactory, + web_video_server::SubscriberFactoryInterface) diff --git a/src/web_video_server.cpp b/src/web_video_server.cpp index 9d2ff74..0318c2a 100644 --- a/src/web_video_server.cpp +++ b/src/web_video_server.cpp @@ -67,15 +67,19 @@ namespace web_video_server WebVideoServer::WebVideoServer(const rclcpp::NodeOptions & options) : rclcpp::Node("web_video_server", options), handler_group_( async_web_server_cpp::HttpReply::stock_reply(async_web_server_cpp::HttpReply::not_found)), - streamer_factory_loader_("web_video_server", "web_video_server::StreamerFactoryInterface"), + streamer_factory_loader_("web_video_server", + "web_video_server::StreamerFactoryInterface"), snapshot_streamer_factory_loader_("web_video_server", - "web_video_server::SnapshotStreamerFactoryInterface") + "web_video_server::SnapshotStreamerFactoryInterface"), + subscriber_factory_loader_("web_video_server", + "web_video_server::SubscriberFactoryInterface") { declare_parameter("port", 8080); declare_parameter("verbose", true); declare_parameter("address", "0.0.0.0"); declare_parameter("server_threads", 1); declare_parameter("publish_rate", -1.0); + declare_parameter("default_qos_profile", "default"); declare_parameter("default_stream_type", "mjpeg"); declare_parameter("default_snapshot_type", "jpeg"); @@ -88,23 +92,39 @@ WebVideoServer::WebVideoServer(const rclcpp::NodeOptions & options) get_parameter("default_stream_type", default_stream_type_); get_parameter("default_snapshot_type", default_snapshot_type_); + for (auto cls : subscriber_factory_loader_.getDeclaredClasses()) { + RCLCPP_INFO(get_logger(), "Loading subscriber plugin: %s", cls.c_str()); + try { + auto subscriber = subscriber_factory_loader_.createSharedInstance(cls); + subscriber_factories_[subscriber->get_type()] = subscriber; + } catch (pluginlib::PluginlibException & ex) { + RCLCPP_ERROR( + get_logger(), + "The subscriber plugin failed to load for some reason. Error: %s", ex.what()); + } + } + for (auto cls : streamer_factory_loader_.getDeclaredClasses()) { RCLCPP_INFO(get_logger(), "Loading streamer plugin: %s", cls.c_str()); try { auto streamer = streamer_factory_loader_.createSharedInstance(cls); streamer_factories_[streamer->get_type()] = streamer; } catch (pluginlib::PluginlibException & ex) { - RCLCPP_ERROR(get_logger(), "The plugin failed to load for some reason. Error: %s", ex.what()); + RCLCPP_ERROR( + get_logger(), + "The streamer plugin failed to load for some reason. Error: %s", ex.what()); } } for (auto cls : snapshot_streamer_factory_loader_.getDeclaredClasses()) { - RCLCPP_INFO(get_logger(), "Loading streamer plugin: %s", cls.c_str()); + RCLCPP_INFO(get_logger(), "Loading snapshot plugin: %s", cls.c_str()); try { auto streamer = snapshot_streamer_factory_loader_.createSharedInstance(cls); snapshot_streamer_factories_[streamer->get_type()] = streamer; } catch (pluginlib::PluginlibException & ex) { - RCLCPP_ERROR(get_logger(), "The plugin failed to load for some reason. Error: %s", ex.what()); + RCLCPP_ERROR( + get_logger(), + "The snapshot plugin failed to load for some reason. Error: %s", ex.what()); } } @@ -204,7 +224,7 @@ bool WebVideoServer::handle_stream( const std::string type = request.get_query_param_value_or_default("type", default_stream_type_); if (streamer_factories_.find(type) != streamer_factories_.end()) { const std::shared_ptr streamer = streamer_factories_[type]->create_streamer( - request, connection, weak_from_this()); + request, connection, subscriber_factories_, weak_from_this()); streamer->start(); const std::scoped_lock lock(streamers_mutex_); streamers_.push_back(streamer); @@ -224,7 +244,7 @@ bool WebVideoServer::handle_snapshot( if (snapshot_streamer_factories_.find(type) != snapshot_streamer_factories_.end()) { const std::shared_ptr streamer = snapshot_streamer_factories_[type]->create_streamer( - request, connection, weak_from_this()); + request, connection, subscriber_factories_, weak_from_this()); streamer->start(); const std::scoped_lock lock(streamers_mutex_); streamers_.push_back(streamer); @@ -275,7 +295,7 @@ bool WebVideoServer::handle_list_streams( for (const auto & factory_pair : streamer_factories_) { RCLCPP_DEBUG(get_logger(), "Getting topics from factory: %s", factory_pair.first.c_str()); const std::vector factory_topics = - factory_pair.second->get_available_topics(*this); + factory_pair.second->get_available_topics(*this, subscriber_factories_); RCLCPP_DEBUG( get_logger(), "Factory %s returned %zu topics", factory_pair.first.c_str(), factory_topics.size()); @@ -289,7 +309,7 @@ bool WebVideoServer::handle_list_streams( for (const auto & factory_pair : snapshot_streamer_factories_) { RCLCPP_DEBUG(get_logger(), "Getting topics from factory: %s", factory_pair.first.c_str()); const std::vector factory_topics = - factory_pair.second->get_available_topics(*this); + factory_pair.second->get_available_topics(*this, subscriber_factories_); RCLCPP_DEBUG( get_logger(), "Factory %s returned %zu topics", factory_pair.first.c_str(), factory_topics.size()); diff --git a/plugins.xml b/streamer_plugins.xml similarity index 100% rename from plugins.xml rename to streamer_plugins.xml diff --git a/subscriber_plugins.xml b/subscriber_plugins.xml new file mode 100644 index 0000000..abf3ff7 --- /dev/null +++ b/subscriber_plugins.xml @@ -0,0 +1,7 @@ + + + Subscribes to images using image_transport. + + \ No newline at end of file