Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 115 additions & 27 deletions realtime_tools/include/realtime_tools/async_function_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@

namespace realtime_tools
{

/**
* @brief Enum class to define the scheduling policy for the async worker thread.
* SYNCHRONIZED: The async worker thread will be synchronized with the main thread, as the main
* thread will be triggering the async callback method.
* DETACHED: The async worker thread will be detached from the main thread and will have its own
* execution cycle.
* UNKNOWN: The scheduling policy is unknown.
*/
class AsyncSchedulingPolicy
{
public:
Expand Down Expand Up @@ -92,9 +99,38 @@ class AsyncSchedulingPolicy
Value value_ = UNKNOWN;
};

/**
* @brief The AsyncFunctionHandlerParams struct is used to configure the AsyncFunctionHandler.
* If the type is SYNCHRONIZED, the async worker thread will be synchronized with the main
* thread, as the main thread will be triggering the async callback method.
* If the type is DETACHED, the async worker thread will be detached from the main thread and
* will have its own execution cycle.
*
* @param thread_priority Priority of the async worker thread. Should be between 0 and 99.
* @param cpu_affinity_cores CPU cores to which the async worker thread should be pinned.
* If empty, the thread will not be pinned to any CPU core.
* @param scheduling_policy Scheduling policy for the async worker thread. Can be either
* SYNCHRONIZED or DETACHED.
* @param exec_rate Execution rate of the async worker thread in Hz. Only used if the
* scheduling_policy is DETACHED. Must be a positive integer.
* @param clock Clock to be used for the async worker thread. Only used if the scheduling_policy
* is DETACHED.
* @param logger Logger to be used for the async worker thread. If not set, a default logger will be used.
* @param trigger_predicate Predicate function to check if the async callback method should be triggered or not.
* If not set, the async callback method will be triggered every time.
* @param wait_until_initial_trigger Whether to wait until the initial trigger predicate is true before starting
* the async callback method. If true, the async callback method will not be called until the trigger predicate
* returns true for the first time. Very useful when the type is DETACHED.
* @param print_warnings Whether to print warnings when the async callback method is not triggered due to any reason.
*/
struct AsyncFunctionHandlerParams
{
bool validate()
/**
* @brief Validate the parameters.
* @return true if the parameters are valid, false otherwise.
* @throws std::runtime_error if the scheduling policy is UNKNOWN.
*/
bool validate() const
{
if (thread_priority < 0 || thread_priority > 99) {
RCLCPP_ERROR(
Expand All @@ -120,28 +156,77 @@ struct AsyncFunctionHandlerParams
RCLCPP_ERROR(logger, "The parsed trigger predicate is not valid!");
return false;
}
for (const int & core : cpu_affinity_cores) {
if (core < 0) {
RCLCPP_ERROR(logger, "Invalid CPU core id: %d. It should be a non-negative integer.", core);
return false;
}
}
return true;
}

/// @brief The AsyncFunctionHandlerParams struct is used to configure the AsyncFunctionHandler.
/// If the type is SYNCHRONIZED, the async worker thread will be synchronized with the main
/// thread, as the main thread will be triggering the async callback method.
/// If the type is DETACHED, the async worker thread will be detached from the main thread and
/// will have its own execution cycle.
int thread_priority = 50; /// Thread priority for the async worker thread
std::vector<int> cpu_affinity_cores =
{}; /// CPU cores to which the async worker thread should be pinned
AsyncSchedulingPolicy scheduling_policy =
AsyncSchedulingPolicy::SYNCHRONIZED; /// Scheduling policy for the async worker thread
unsigned int exec_rate = 0u; /// Execution rate of the async worker thread in Hz
rclcpp::Clock::SharedPtr clock = nullptr; /// Clock to be used for the async worker thread
rclcpp::Logger logger =
rclcpp::get_logger("AsyncFunctionHandler"); /// Logger to be used for the async worker thread
std::function<bool()> trigger_predicate = []() {
return true;
}; /// Predicate function to check if the async callback method should be triggered or not
bool wait_until_initial_trigger =
true; /// Whether to wait until the initial trigger predicate is true
/**
* @brief Initialize the parameters from a node's parameters.
* The node should have the following parameters:
* - thread_priority (int): Priority of the async worker thread. Default is 50.
* - cpu_affinity (int[]): CPU cores to which the async worker thread should be pinned.
* Default is empty, which means the thread will not be pinned to any CPU core.
* - scheduling_policy (string): Scheduling policy for the async worker thread. Can be either
* "synchronized" or "detached". Default is "synchronized".
* - execution_rate (int): Execution rate of the async worker thread in Hz.
* - wait_until_initial_trigger (bool): Whether to wait until the initial trigger predicate is true
* before starting the async callback method. Default is true.
* - print_warnings (bool): Whether to print warnings when the async callback method is not triggered
* due to any reason. Default is true.
* @param node The node that is used to get the parameters.
* @param prefix Parameter prefix to use when accessing node parameters.
*/
template <typename NodeT>
void initialize(NodeT & node, const std::string & prefix)
{
if (node->has_parameter(prefix + "thread_priority")) {
thread_priority = static_cast<int>(node->get_parameter(prefix + "thread_priority").as_int());
}
if (node->has_parameter(prefix + "cpu_affinity")) {
const auto cpu_affinity_param =
node->get_parameter(prefix + "cpu_affinity").as_integer_array();
for (const auto & core : cpu_affinity_param) {
cpu_affinity_cores.push_back(static_cast<int>(core));
}
}
if (node->has_parameter(prefix + "scheduling_policy")) {
scheduling_policy =
AsyncSchedulingPolicy(node->get_parameter(prefix + "scheduling_policy").as_string());
}
if (
scheduling_policy == AsyncSchedulingPolicy::DETACHED &&
node->has_parameter(prefix + "execution_rate")) {
const int execution_rate =
static_cast<int>(node->get_parameter(prefix + "execution_rate").as_int());
if (execution_rate <= 0) {
throw std::runtime_error(
"AsyncFunctionHandler: execution_rate parameter must be positive.");
}
exec_rate = static_cast<unsigned int>(execution_rate);
}
if (node->has_parameter(prefix + "wait_until_initial_trigger")) {
wait_until_initial_trigger =
node->get_parameter(prefix + "wait_until_initial_trigger").as_bool();
}
if (node->has_parameter(prefix + "print_warnings")) {
print_warnings = node->get_parameter(prefix + "print_warnings").as_bool();
}
}

int thread_priority = 50;
std::vector<int> cpu_affinity_cores = {};
AsyncSchedulingPolicy scheduling_policy = AsyncSchedulingPolicy::SYNCHRONIZED;
unsigned int exec_rate = 0u;
rclcpp::Clock::SharedPtr clock = nullptr;
rclcpp::Logger logger = rclcpp::get_logger("AsyncFunctionHandler");
std::function<bool()> trigger_predicate = []() { return true; };
bool wait_until_initial_trigger = true;
bool print_warnings = true;
};

/**
Expand Down Expand Up @@ -210,6 +295,7 @@ class AsyncFunctionHandler
std::function<T(const rclcpp::Time &, const rclcpp::Duration &)> callback,
const AsyncFunctionHandlerParams & params)
{
params.validate();
init(callback, params.trigger_predicate, params.thread_priority);
params_ = params;
pause_thread_ = params.wait_until_initial_trigger;
Expand Down Expand Up @@ -250,7 +336,7 @@ class AsyncFunctionHandler
RCLCPP_WARN_ONCE(
params_.logger,
"AsyncFunctionHandler is configured with DETACHED scheduling policy. "
"This means that the async callback will not be synchronized with the main thread. ");
"This means that the async callback may not be synchronized with the main thread. ");
if (pause_thread_.load(std::memory_order_relaxed)) {
{
std::unique_lock<std::mutex> lock(async_mtx_);
Expand Down Expand Up @@ -560,11 +646,13 @@ class AsyncFunctionHandler
std::chrono::duration<double, std::milli>(time_now - next_iteration_time).count();
const double cm_period = 1.e3 / static_cast<double>(params_.exec_rate);
const int overrun_count = static_cast<int>(std::ceil(time_diff / cm_period));
RCLCPP_WARN_THROTTLE(
params_.logger, *params_.clock, 1000,
"Overrun detected! The async callback missed its desired rate of %d Hz. The loop "
"took %f ms (missed cycles : %d).",
params_.exec_rate, time_diff + cm_period, overrun_count + 1);
if (params_.print_warnings) {
RCLCPP_WARN_THROTTLE(
params_.logger, *params_.clock, 1000,
"Overrun detected! The async callback missed its desired rate of %d Hz. The loop "
"took %f ms (missed cycles : %d).",
params_.exec_rate, time_diff + cm_period, overrun_count + 1);
}
next_iteration_time += (overrun_count * period);
}
std::this_thread::sleep_until(next_iteration_time);
Expand Down
15 changes: 10 additions & 5 deletions realtime_tools/test/test_async_function_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <memory>

#include "gmock/gmock.h"
#include "rclcpp/node.hpp"
#include "rclcpp/rclcpp.hpp"
#include "test_async_function_handler.hpp"

Expand Down Expand Up @@ -432,12 +433,16 @@ TEST_F(AsyncFunctionHandlerTest, trigger_for_several_cycles_in_detached_scheduli
{
realtime_tools::TestAsyncFunctionHandler async_class;

rclcpp::Clock::SharedPtr clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
rclcpp::NodeOptions node_options;
node_options.arguments(
{"--ros-args", "-p", "scheduling_policy:=detached", "-p", "wait_until_initial_trigger:=false",
"-p", "execution_rate:=500", "-p", "thread_priority:=60", "-p", "cpu_affinity:=[0]"});
node_options.allow_undeclared_parameters(true);
node_options.automatically_declare_parameters_from_overrides(true);
rclcpp::Node::SharedPtr node = std::make_shared<rclcpp::Node>("test_node", node_options);
realtime_tools::AsyncFunctionHandlerParams params;
params.scheduling_policy = realtime_tools::AsyncSchedulingPolicy::DETACHED;
params.clock = clock;
params.exec_rate = 500u; // 500 Hz
params.wait_until_initial_trigger = false;
params.clock = node->get_clock();
params.initialize(node, "");
async_class.initialize(params);
ASSERT_TRUE(async_class.get_handler().is_initialized());
ASSERT_FALSE(async_class.get_handler().is_running());
Expand Down