Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(velodyne): enable dynamic reconfigure of launch_hw parameter #263

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,20 @@ Status VelodyneHwInterface::get_sensor_configuration(SensorConfigurationBase & s

VelodyneStatus VelodyneHwInterface::init_http_client()
{
try {
http_client_driver_->init_client(sensor_configuration_->sensor_ip, 80);
if (!http_client_driver_->client()->isOpen()) {
http_client_driver_->client()->open();
while (true) {
try {
http_client_driver_->init_client(sensor_configuration_->sensor_ip, 80);
if (!http_client_driver_->client()->isOpen()) {
http_client_driver_->client()->open();
}
return Status::OK;
} catch (const std::exception & ex) {
RCLCPP_INFO_ONCE(
*parent_node_logger_, "Cannot connect to lidar because of: %s. Will keep trying...",
ex.what());
std::this_thread::sleep_for(std::chrono::seconds(5));
}
} catch (const std::exception & ex) {
VelodyneStatus status = Status::HTTP_CONNECTION_ERROR;
return status;
}
return Status::OK;
}

void VelodyneHwInterface::string_callback(const std::string & str)
Expand Down
2 changes: 1 addition & 1 deletion nebula_ros/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ project(nebula_ros)

# Default to C++17
if (NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD 20)
endif ()

if (CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
Expand Down
17 changes: 15 additions & 2 deletions nebula_ros/include/nebula_ros/common/mt_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,32 @@ class MtQueue
void push(T && value)
{
std::unique_lock<std::mutex> lock(this->mutex_);
this->cv_not_full_.wait(lock, [=] { return this->queue_.size() < this->capacity_; });
this->cv_not_full_.wait(lock, [this] { return this->queue_.size() < this->capacity_; });
queue_.push_front(std::move(value));
this->cv_not_empty_.notify_all();
}

T pop()
{
std::unique_lock<std::mutex> lock(this->mutex_);
this->cv_not_empty_.wait(lock, [=] { return !this->queue_.empty(); });
this->cv_not_empty_.wait(lock, [this] { return !this->queue_.empty(); });
T return_value(std::move(this->queue_.back()));
this->queue_.pop_back();
this->cv_not_full_.notify_all();

return return_value;
}

std::pair<T, bool> pop(std::chrono::milliseconds timeout)
{
std::unique_lock<std::mutex> lock(this->mutex_);
if (!this->cv_not_empty_.wait_for(lock, timeout, [this] { return !this->queue_.empty(); })) {
return std::make_pair(T(), false);
}
T return_value(std::move(this->queue_.back()));
this->queue_.pop_back();
this->cv_not_full_.notify_all();

return std::make_pair(std::move(return_value), true);
}
};
22 changes: 21 additions & 1 deletion nebula_ros/include/nebula_ros/velodyne/velodyne_ros_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,21 @@ class VelodyneRosWrapper final : public rclcpp::Node

Status declare_and_get_sensor_config_params();

void reconfigure_hw_interface();

void create_packet_subscriber();
void reset_packet_subscriber();

// HW interface management
void bringup_hw(bool);
void cleanup_on_hw_reconfigure();
void setup_decoder();

// Decoder thread management
void decoder_wrapper_thread(std::stop_token stoken);
void set_decoder_wrapper();
void stop_decoder_thread() { decoder_thread_.request_stop(); }

/// @brief rclcpp parameter callback
/// @param parameters Received parameters
/// @return SetParametersResult
Expand All @@ -85,15 +100,20 @@ class VelodyneRosWrapper final : public rclcpp::Node
/// @brief Stores received packets that have not been processed yet by the decoder thread
MtQueue<std::unique_ptr<nebula_msgs::msg::NebulaPacket>> packet_queue_;
/// @brief Thread to isolate decoding from receiving
std::thread decoder_thread_;
std::jthread decoder_thread_;

rclcpp::Subscription<velodyne_msgs::msg::VelodyneScan>::SharedPtr packets_sub_{};

bool launch_hw_;
bool use_udp_only_;

bool restart_hw_ = false;
bool restart_packet_subscriber_ = false;

std::optional<VelodyneHwInterfaceWrapper> hw_interface_wrapper_;
std::optional<VelodyneHwMonitorWrapper> hw_monitor_wrapper_;
std::optional<VelodyneDecoderWrapper> decoder_wrapper_;
rclcpp::TimerBase::SharedPtr hw_reconfigure_timer_;

std::mutex mtx_config_;

Expand Down
8 changes: 6 additions & 2 deletions nebula_ros/src/velodyne/decoder_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ VelodyneDecoderWrapper::VelodyneDecoderWrapper(
"VelodyneDecoderWrapper cannot be instantiated without a valid config!");
}

calibration_file_path_ =
parent_node->declare_parameter<std::string>("calibration_file", param_read_write());
if (parent_node->has_parameter("calibration_file")) {
calibration_file_path_ = parent_node->get_parameter("calibration_file").as_string();
} else {
calibration_file_path_ =
parent_node->declare_parameter<std::string>("calibration_file", param_read_write());
}
auto calibration_result = get_calibration_data(calibration_file_path_);

if (!calibration_result.has_value()) {
Expand Down
7 changes: 5 additions & 2 deletions nebula_ros/src/velodyne/hw_interface_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ VelodyneHwInterfaceWrapper::VelodyneHwInterfaceWrapper(
status_(Status::NOT_INITIALIZED),
use_udp_only_(use_udp_only)
{
setup_sensor_ = parent_node->declare_parameter<bool>("setup_sensor", param_read_only());

if (parent_node->has_parameter("setup_sensor")) {
setup_sensor_ = parent_node->get_parameter("setup_sensor").as_bool();
} else {
setup_sensor_ = parent_node->declare_parameter<bool>("setup_sensor", param_read_only());
}
hw_interface_->set_logger(
std::make_shared<rclcpp::Logger>(parent_node->get_logger().get_child("HwInterface")));
status_ = hw_interface_->initialize_sensor_configuration(config);
Expand Down
14 changes: 11 additions & 3 deletions nebula_ros/src/velodyne/hw_monitor_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,17 @@ VelodyneHwMonitorWrapper::VelodyneHwMonitorWrapper(
parent_node_(parent_node),
sensor_configuration_(config)
{
diag_span_ = parent_node->declare_parameter<uint16_t>("diag_span", param_read_only());
show_advanced_diagnostics_ =
parent_node->declare_parameter<bool>("advanced_diagnostics", param_read_only());
if (parent_node->has_parameter("diag_span")) {
diag_span_ = parent_node->get_parameter("diag_span").as_int();
} else {
diag_span_ = parent_node->declare_parameter<uint16_t>("diag_span", param_read_only());
}
if (parent_node->has_parameter("advanced_diagnostics")) {
show_advanced_diagnostics_ = parent_node->get_parameter("advanced_diagnostics").as_bool();
} else {
show_advanced_diagnostics_ =
parent_node->declare_parameter<bool>("advanced_diagnostics", param_read_only());
}

std::cout << "Get model name and serial." << std::endl;
auto str = hw_interface_->get_snapshot();
Expand Down
156 changes: 127 additions & 29 deletions nebula_ros/src/velodyne/velodyne_ros_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include <nebula_common/util/string_conversions.hpp>

#include <pthread.h> // debug only

#include <algorithm>
#include <cstdio>
#include <memory>
Expand All @@ -22,8 +24,11 @@ VelodyneRosWrapper::VelodyneRosWrapper(const rclcpp::NodeOptions & options)
packet_queue_(3000),
hw_interface_wrapper_(),
hw_monitor_wrapper_(),
decoder_wrapper_()
decoder_wrapper_(),
hw_reconfigure_timer_(this->create_wall_timer(
std::chrono::seconds(1), std::bind(&VelodyneRosWrapper::reconfigure_hw_interface, this)))
{
hw_reconfigure_timer_->cancel();
setvbuf(stdout, NULL, _IONBF, BUFSIZ);

wrapper_status_ = declare_and_get_sensor_config_params();
Expand All @@ -34,51 +39,92 @@ VelodyneRosWrapper::VelodyneRosWrapper(const rclcpp::NodeOptions & options)

RCLCPP_INFO_STREAM(get_logger(), "Sensor Configuration: " << *sensor_cfg_ptr_);

launch_hw_ = declare_parameter<bool>("launch_hw", param_read_only());
bool use_udp_only = declare_parameter<bool>("udp_only", param_read_only());
launch_hw_ = declare_parameter<bool>("launch_hw", param_read_write());
use_udp_only_ = declare_parameter<bool>("udp_only", param_read_only());

if (use_udp_only) {
if (use_udp_only_) {
RCLCPP_INFO_STREAM(
get_logger(),
"UDP-only mode is enabled. Settings checks, synchronization, and diagnostics publishing are "
"disabled.");
}

if (launch_hw_) {
hw_interface_wrapper_.emplace(this, sensor_cfg_ptr_, use_udp_only);
if (!use_udp_only) { // hardware monitor requires HTTP connection
hw_monitor_wrapper_.emplace(this, hw_interface_wrapper_->hw_interface(), sensor_cfg_ptr_);
}
bringup_hw(use_udp_only_);
} else {
create_packet_subscriber();
}

setup_decoder();

// Register parameter callback after all params have been declared. Otherwise it would be called
// once for each declaration
parameter_event_cb_ = add_on_set_parameters_callback(
std::bind(&VelodyneRosWrapper::on_parameter_change, this, std::placeholders::_1));
}

void VelodyneRosWrapper::reconfigure_hw_interface()
{
if (restart_hw_) {
bringup_hw(use_udp_only_);
setup_decoder();
restart_hw_ = false;
hw_reconfigure_timer_->cancel();
}

if (restart_packet_subscriber_) {
create_packet_subscriber();
setup_decoder();
restart_packet_subscriber_ = false;
hw_reconfigure_timer_->cancel();
}
}

void VelodyneRosWrapper::create_packet_subscriber()
{
packets_sub_ = create_subscription<velodyne_msgs::msg::VelodyneScan>(
"velodyne_packets", rclcpp::SensorDataQoS(),
std::bind(&VelodyneRosWrapper::receive_scan_message_callback, this, std::placeholders::_1));
RCLCPP_INFO_STREAM(
get_logger(),
"Hardware connection disabled, listening for packets on " << packets_sub_->get_topic_name());
}

void VelodyneRosWrapper::set_decoder_wrapper()
{
// If there's an existing decoder wrapper, reset it
if (decoder_wrapper_) {
decoder_wrapper_.reset();
}
decoder_wrapper_.emplace(
this, hw_interface_wrapper_ ? hw_interface_wrapper_->hw_interface() : nullptr, sensor_cfg_ptr_);
}

void VelodyneRosWrapper::decoder_wrapper_thread(std::stop_token stoken)
{
RCLCPP_DEBUG(get_logger(), "Starting stream");

decoder_thread_ = std::thread([this]() {
while (true) {
decoder_wrapper_->process_cloud_packet(packet_queue_.pop());
while (!stoken.stop_requested()) {
auto [packet, valid] = packet_queue_.pop(std::chrono::milliseconds(100));
if (valid) {
decoder_wrapper_->process_cloud_packet(std::move(packet));
} else {
continue;
}
});

if (launch_hw_) {
hw_interface_wrapper_->hw_interface()->register_scan_callback(
std::bind(&VelodyneRosWrapper::receive_cloud_packet_callback, this, std::placeholders::_1));
stream_start();
} else {
packets_sub_ = create_subscription<velodyne_msgs::msg::VelodyneScan>(
"velodyne_packets", rclcpp::SensorDataQoS(),
std::bind(&VelodyneRosWrapper::receive_scan_message_callback, this, std::placeholders::_1));
RCLCPP_INFO_STREAM(
get_logger(),
"Hardware connection disabled, listening for packets on " << packets_sub_->get_topic_name());
}
RCLCPP_INFO(get_logger(), "Gracefully stopped decoder thread");
}

// Register parameter callback after all params have been declared. Otherwise it would be called
// once for each declaration
parameter_event_cb_ = add_on_set_parameters_callback(
std::bind(&VelodyneRosWrapper::on_parameter_change, this, std::placeholders::_1));
void VelodyneRosWrapper::bringup_hw(bool use_udp_only)
{
hw_interface_wrapper_.emplace(this, sensor_cfg_ptr_, use_udp_only);

if (!use_udp_only) { // hardware monitor requires HTTP connection
hw_monitor_wrapper_.emplace(this, hw_interface_wrapper_->hw_interface(), sensor_cfg_ptr_);
}
hw_interface_wrapper_->hw_interface()->register_scan_callback(
std::bind(&VelodyneRosWrapper::receive_cloud_packet_callback, this, std::placeholders::_1));
stream_start();
}

nebula::Status VelodyneRosWrapper::declare_and_get_sensor_config_params()
Expand Down Expand Up @@ -194,6 +240,38 @@ Status VelodyneRosWrapper::stream_start()
return hw_interface_wrapper_->hw_interface()->sensor_interface_start();
}

void VelodyneRosWrapper::cleanup_on_hw_reconfigure()
{
while (true) {
if (decoder_thread_.joinable()) {
stop_decoder_thread();
break;
}
}
decoder_thread_.join();
if (hw_interface_wrapper_) {
hw_interface_wrapper_.reset();
}
if (hw_monitor_wrapper_) {
hw_monitor_wrapper_.reset();
}
}

void VelodyneRosWrapper::setup_decoder()
{
set_decoder_wrapper();
decoder_thread_ = std::jthread(&VelodyneRosWrapper::decoder_wrapper_thread, this);
// Set the thread name
pthread_setname_np(decoder_thread_.native_handle(), "VelDecThread"); // debug only
}

void VelodyneRosWrapper::reset_packet_subscriber()
{
if (packets_sub_) {
packets_sub_.reset();
}
}

rcl_interfaces::msg::SetParametersResult VelodyneRosWrapper::on_parameter_change(
const std::vector<rclcpp::Parameter> & p)
{
Expand All @@ -216,7 +294,27 @@ rcl_interfaces::msg::SetParametersResult VelodyneRosWrapper::on_parameter_change
get_param(p, "max_range", new_cfg.max_range) |
get_param(p, "rotation_speed", new_cfg.rotation_speed) |
get_param(p, "cloud_min_angle", new_cfg.cloud_min_angle) |
get_param(p, "cloud_max_angle", new_cfg.cloud_max_angle);
get_param(p, "cloud_max_angle", new_cfg.cloud_max_angle) |
get_param(p, "launch_hw", launch_hw_);

if (got_any && launch_hw_ && !hw_interface_wrapper_) {
RCLCPP_INFO(get_logger(), "Cancelling packet subscription...");
reset_packet_subscriber();
RCLCPP_INFO(get_logger(), "Resetting HW interface and HW Monitor wrappers...");
cleanup_on_hw_reconfigure();
RCLCPP_INFO(get_logger(), "(re)Configuring HW interface");
restart_hw_ = true;
hw_reconfigure_timer_->reset();
}

if (got_any && !launch_hw_ && hw_interface_wrapper_) {
RCLCPP_INFO(get_logger(), "Resetting HW interface and HW Monitor wrappers...");
cleanup_on_hw_reconfigure();
reset_packet_subscriber();
RCLCPP_INFO(get_logger(), "(re) Configuring packet subscriber");
restart_packet_subscriber_ = true;
hw_reconfigure_timer_->reset();
}

// Currently, HW interface and monitor wrappers have only read-only parameters, so their update
// logic is not implemented
Expand Down