Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions diagnostic_aggregator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ find_package(ament_cmake REQUIRED)
find_package(diagnostic_msgs REQUIRED)
find_package(pluginlib REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rclcpp_components REQUIRED)
find_package(rcl_interfaces REQUIRED)
find_package(std_msgs REQUIRED)

add_library(${PROJECT_NAME} SHARED
src/status_item.cpp
src/analyzer_group.cpp
src/aggregator.cpp)
target_include_directories(${PROJECT_NAME} PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
Expand All @@ -29,11 +29,15 @@ ament_target_dependencies(${PROJECT_NAME}
"diagnostic_msgs"
"pluginlib"
"rclcpp"
"rclcpp_components"
"std_msgs"
)
target_compile_definitions(${PROJECT_NAME}
PRIVATE "DIAGNOSTIC_AGGREGATOR_BUILDING_DLL")

# Register the aggregator node as a component
rclcpp_components_register_nodes(${PROJECT_NAME} "diagnostic_aggregator::Aggregator")

# see https://github.com/pybind/pybind11/commit/ba33b2fc798418c8c9dfe801c5b9023d3703f417
if(NOT WIN32)
target_compile_options(${PROJECT_NAME} PRIVATE -Wdeprecated)
Expand All @@ -46,7 +50,9 @@ set(ANALYZERS "${PROJECT_NAME}_analyzers")
add_library(${ANALYZERS} SHARED
src/generic_analyzer.cpp
src/discard_analyzer.cpp
src/ignore_analyzer.cpp)
src/ignore_analyzer.cpp
src/analyzer_group.cpp
)
target_include_directories(${ANALYZERS} PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)
Expand Down Expand Up @@ -195,8 +201,9 @@ ament_python_install_package(${PROJECT_NAME})
set(ANALYZER_PARAMS_FILEPATH "${CMAKE_INSTALL_PREFIX}/share/${PROJECT_NAME}/example_analyzers.yaml")
set(ADD_ANALYZER_PARAMS_FILEPATH "${CMAKE_INSTALL_PREFIX}/share/${PROJECT_NAME}/example_add_analyzers.yaml")
configure_file(example/example.launch.py.in example.launch.py @ONLY)
configure_file(example/compose_example.launch.py.in compose_example.launch.py @ONLY)
install( # launch descriptor
FILES ${CMAKE_CURRENT_BINARY_DIR}/example.launch.py
FILES ${CMAKE_CURRENT_BINARY_DIR}/example.launch.py ${CMAKE_CURRENT_BINARY_DIR}/compose_example.launch.py
DESTINATION share/${PROJECT_NAME}
)
install( # example publisher
Expand All @@ -218,6 +225,7 @@ ament_export_dependencies(ament_cmake_python)
ament_export_dependencies(diagnostic_msgs)
ament_export_dependencies(pluginlib)
ament_export_dependencies(rclcpp)
ament_export_dependencies(rclcpp_components)
ament_export_dependencies(rclpy)
ament_export_dependencies(std_msgs)

Expand Down
35 changes: 35 additions & 0 deletions diagnostic_aggregator/example/compose_example.launch.py.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Launch analyzer loader with parameters from yaml."""

import launch
from launch_ros.actions import ComposableNodeContainer, Node
from launch_ros.descriptions import ComposableNode

analyzer_params_filepath = "@ANALYZER_PARAMS_FILEPATH@"
add_analyzer_params_filepath = "@ADD_ANALYZER_PARAMS_FILEPATH@"


def generate_launch_description():
container = ComposableNodeContainer(
name="diagnostics_container",
namespace='',
package='rclcpp_components',
executable='component_container',
composable_node_descriptions=[
ComposableNode(
package='diagnostic_aggregator',
plugin='diagnostic_aggregator::Aggregator',
name='analyzers',
parameters=[analyzer_params_filepath]
)
]
)

diag_publisher = Node(
package='diagnostic_aggregator',
executable='example_pub.py'
)
return launch.LaunchDescription([
container,
# add_analyzer,
diag_publisher,
])
7 changes: 5 additions & 2 deletions diagnostic_aggregator/example/example_pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,11 @@ def timer_callback(self):
def main(args=None):
rclpy.init(args=args)

node = DiagnosticTalker()
rclpy.spin(node)
try:
node = DiagnosticTalker()
rclpy.spin(node)
except KeyboardInterrupt:
pass

node.destroy_node()
rclpy.try_shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ other_as_errors: false
* any analyzer is not properly specified, or returns false on initialization,
* the aggregator will report the error and publish it in the aggregated output.
*/
class Aggregator
class Aggregator : public rclcpp::Node
{
public:
/*!
Expand All @@ -127,13 +127,7 @@ class Aggregator
DIAGNOSTIC_AGGREGATOR_PUBLIC
void publishData();

DIAGNOSTIC_AGGREGATOR_PUBLIC
rclcpp::Node::SharedPtr get_node() const;

private:
rclcpp::Node::SharedPtr n_;

rclcpp::Logger logger_;
rclcpp::TimerBase::SharedPtr publish_timer_;

/// AddDiagnostics, /diagnostics_agg/add_diagnostics
Expand All @@ -156,7 +150,7 @@ class Aggregator
*/
void diagCallback(const diagnostic_msgs::msg::DiagnosticArray::SharedPtr diag_msg);

std::unique_ptr<AnalyzerGroup> analyzer_group_;
std::shared_ptr<Analyzer> analyzer_group_;
std::unique_ptr<OtherAnalyzer> other_analyzer_;

std::string base_path_; /**< \brief Prepended to all status names of aggregator. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class Analyzer
DIAGNOSTIC_AGGREGATOR_PUBLIC
virtual bool init(
const std::string & base_path, const std::string & breadcrumb,
const rclcpp::Node::SharedPtr node) = 0;
const rclcpp::Node &) = 0;

/*!
*\brief Returns true if analyzer will handle this item
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class AnalyzerGroup : public Analyzer
DIAGNOSTIC_AGGREGATOR_PUBLIC
virtual bool init(
const std::string & base_path, const std::string & breadcrumb,
const rclcpp::Node::SharedPtr node);
const rclcpp::Node & node);

/**!
*\brief Add an analyzer to this analyzerGroup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class GenericAnalyzer : public GenericAnalyzerBase
DIAGNOSTIC_AGGREGATOR_PUBLIC
bool init(
const std::string & base_path, const std::string & breadcrumb,
const rclcpp::Node::SharedPtr node);
const rclcpp::Node &);

/*!
*\brief Reports current state, returns vector of formatted status messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class GenericAnalyzerBase : public Analyzer
DIAGNOSTIC_AGGREGATOR_PUBLIC
bool init(
const std::string & base_path, const std::string & breadcrumb,
const rclcpp::Node::SharedPtr node) = 0;
const rclcpp::Node &) = 0;

/*
*\brief Must be initialized with path, and a "nice name"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class IgnoreAnalyzer : public Analyzer
DIAGNOSTIC_AGGREGATOR_PUBLIC
bool init(
const std::string & base_path, const std::string & breadcrumb,
const rclcpp::Node::SharedPtr node);
const rclcpp::Node &);

bool match(const std::string & name)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class OtherAnalyzer : public GenericAnalyzerBase
*/
bool init(
const std::string & base_path, const std::string & breadcrumb,
const rclcpp::Node::SharedPtr node)
const rclcpp::Node & node)
{
(void)base_path;
(void)breadcrumb;
Expand Down
1 change: 1 addition & 0 deletions diagnostic_aggregator/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<build_depend>rclcpp</build_depend>
<build_depend>std_msgs</build_depend>

<depend>rclcpp_components</depend>
<depend>rclpy</depend>

<test_depend>ament_cmake_gtest</test_depend>
Expand Down
73 changes: 39 additions & 34 deletions diagnostic_aggregator/src/aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
#include <string>
#include <vector>

#include "pluginlib/class_loader.hpp"

namespace diagnostic_aggregator
{
using std::placeholders::_1;
Expand All @@ -60,40 +62,39 @@ Aggregator::Aggregator()
: Aggregator(rclcpp::NodeOptions()) {}

Aggregator::Aggregator(rclcpp::NodeOptions options)
: n_(std::make_shared<rclcpp::Node>(
"analyzers", "",
options.allow_undeclared_parameters(true).
automatically_declare_parameters_from_overrides(true))),
logger_(rclcpp::get_logger("Aggregator")),
: rclcpp::Node("analyzers", "",
options
.allow_undeclared_parameters(true)
.automatically_declare_parameters_from_overrides(true)),
pub_rate_(1.0),
history_depth_(1000),
clock_(n_->get_clock()),
clock_(this->get_clock()),
base_path_(""),
critical_(false),
last_top_level_state_(DiagnosticStatus::STALE)
{
RCLCPP_DEBUG(logger_, "constructor");
RCLCPP_DEBUG(this->get_logger(), "constructor");
initAnalyzers();

diag_sub_ = n_->create_subscription<DiagnosticArray>(
diag_sub_ = this->create_subscription<DiagnosticArray>(
"/diagnostics", rclcpp::SystemDefaultsQoS().keep_last(history_depth_),
std::bind(&Aggregator::diagCallback, this, _1));
agg_pub_ = n_->create_publisher<DiagnosticArray>("/diagnostics_agg", 1);
agg_pub_ = this->create_publisher<DiagnosticArray>("/diagnostics_agg", 1);
toplevel_state_pub_ =
n_->create_publisher<DiagnosticStatus>("/diagnostics_toplevel_state", 1);
this->create_publisher<DiagnosticStatus>("/diagnostics_toplevel_state", 1);

int publish_rate_ms = 1000 / pub_rate_;
publish_timer_ = n_->create_wall_timer(
publish_timer_ = this->create_wall_timer(
std::chrono::milliseconds(publish_rate_ms),
std::bind(&Aggregator::publishData, this));

param_sub_ = n_->create_subscription<rcl_interfaces::msg::ParameterEvent>(
param_sub_ = this->create_subscription<rcl_interfaces::msg::ParameterEvent>(
"/parameter_events", 1, std::bind(&Aggregator::parameterCallback, this, _1));
}

void Aggregator::parameterCallback(const rcl_interfaces::msg::ParameterEvent::SharedPtr msg)
{
if (msg->node == "/" + std::string(n_->get_name())) {
if (msg->node == "/" + std::string(this->get_name())) {
if (msg->new_parameters.size() != 0) {
base_path_ = "";
initAnalyzers();
Expand All @@ -106,10 +107,10 @@ void Aggregator::initAnalyzers()
bool other_as_errors = false;

std::map<std::string, rclcpp::Parameter> parameters;
if (!n_->get_parameters("", parameters)) {
RCLCPP_ERROR(logger_, "Couldn't retrieve parameters.");
if (!this->get_parameters("", parameters)) {
RCLCPP_ERROR(this->get_logger(), "Couldn't retrieve parameters.");
}
RCLCPP_DEBUG(logger_, "Retrieved %zu parameter(s).", parameters.size());
RCLCPP_DEBUG(this->get_logger(), "Retrieved %zu parameter(s).", parameters.size());

for (const auto & param : parameters) {
if (param.first.compare("pub_rate") == 0) {
Expand All @@ -128,18 +129,25 @@ void Aggregator::initAnalyzers()
critical_ = param.second.as_bool();
}
}
RCLCPP_DEBUG(logger_, "Aggregator publication rate configured to: %f", pub_rate_);
RCLCPP_DEBUG(logger_, "Aggregator base path configured to: %s", base_path_.c_str());
RCLCPP_DEBUG(this->get_logger(), "Aggregator publication rate configured to: %f", pub_rate_);
RCLCPP_DEBUG(this->get_logger(), "Aggregator base path configured to: %s", base_path_.c_str());
RCLCPP_DEBUG(
logger_, "Aggregator other_as_errors configured to: %s", (other_as_errors ? "true" : "false"));
this->get_logger(), "Aggregator other_as_errors configured to: %s",
(other_as_errors ? "true" : "false"));
RCLCPP_DEBUG(
logger_, "Aggregator critical publisher configured to: %s", (critical_ ? "true" : "false"));
this->get_logger(), "Aggregator critical publisher configured to: %s",
(critical_ ? "true" : "false"));

{ // lock the mutex while analyzer_group_ and other_analyzer_ are being updated
std::lock_guard<std::mutex> lock(mutex_);
analyzer_group_ = std::make_unique<AnalyzerGroup>();
if (!analyzer_group_->init(base_path_, "", n_)) {
RCLCPP_ERROR(logger_, "Analyzer group for diagnostic aggregator failed to initialize!");

pluginlib::ClassLoader<Analyzer> loader{"diagnostic_aggregator",
"diagnostic_aggregator::Analyzer"};
analyzer_group_ = loader.createSharedInstance("diagnostic_aggregator/AnalyzerGroup");

if (!analyzer_group_->init(base_path_, "", *this)) {
RCLCPP_ERROR(this->get_logger(),
"Analyzer group for diagnostic aggregator failed to initialize!");
}

// Last analyzer handles remaining data
Expand All @@ -150,7 +158,7 @@ void Aggregator::initAnalyzers()

void Aggregator::checkTimestamp(const DiagnosticArray::SharedPtr diag_msg)
{
RCLCPP_DEBUG(logger_, "checkTimestamp()");
RCLCPP_DEBUG(this->get_logger(), "checkTimestamp()");
if (diag_msg->header.stamp.sec != 0) {
return;
}
Expand All @@ -166,13 +174,13 @@ void Aggregator::checkTimestamp(const DiagnosticArray::SharedPtr diag_msg)

auto result = ros_warnings_.insert(stamp_warn);
if (result.second) {
RCLCPP_WARN(logger_, "%s", stamp_warn.c_str());
RCLCPP_WARN(this->get_logger(), "%s", stamp_warn.c_str());
}
}

void Aggregator::diagCallback(const DiagnosticArray::SharedPtr diag_msg)
{
RCLCPP_DEBUG(logger_, "diagCallback()");
RCLCPP_DEBUG(this->get_logger(), "diagCallback()");
checkTimestamp(diag_msg);

bool analyzed = false;
Expand Down Expand Up @@ -205,12 +213,12 @@ void Aggregator::diagCallback(const DiagnosticArray::SharedPtr diag_msg)

Aggregator::~Aggregator()
{
RCLCPP_DEBUG(logger_, "destructor");
RCLCPP_DEBUG(this->get_logger(), "destructor");
}

void Aggregator::publishData()
{
RCLCPP_DEBUG(logger_, "publishData()");
RCLCPP_DEBUG(this->get_logger(), "publishData()");
DiagnosticArray diag_array;
DiagnosticStatus diag_toplevel_state;
diag_toplevel_state.name = "toplevel_state";
Expand Down Expand Up @@ -263,10 +271,7 @@ void Aggregator::publishData()
toplevel_state_pub_->publish(diag_toplevel_state);
}

rclcpp::Node::SharedPtr Aggregator::get_node() const
{
RCLCPP_DEBUG(logger_, "get_node()");
return this->n_;
}

} // namespace diagnostic_aggregator

#include "rclcpp_components/register_node_macro.hpp"
RCLCPP_COMPONENTS_REGISTER_NODE(diagnostic_aggregator::Aggregator)
2 changes: 1 addition & 1 deletion diagnostic_aggregator/src/aggregator_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ int main(int argc, char ** argv)

rclcpp::executors::SingleThreadedExecutor exec;
auto agg = std::make_shared<diagnostic_aggregator::Aggregator>();
exec.add_node(agg->get_node());
exec.add_node(agg);
exec.spin();

rclcpp::shutdown();
Expand Down
Loading