From 2be3baf0a4c5b0492fb7ad4561874956cc84d2d8 Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Wed, 29 Oct 2025 15:59:24 -0500 Subject: [PATCH 1/2] Fixing problems with scheduler fast-idle mode Signed-off-by: Hartmut Kaiser --- cmake/HPX_AddModule.cmake | 8 +- .../hpx/parallel/util/foreach_partitioner.hpp | 45 +++++-- .../hpx/parallel/util/result_types.hpp | 34 +++--- .../tests/performance/benchmark_merge.cpp | 58 ++++++++- .../performance/benchmark_merge_sweep.cpp | 37 ++++++ .../include/hpx/execution_base/agent_base.hpp | 2 +- .../include/hpx/execution_base/agent_ref.hpp | 2 +- .../hpx/execution_base/this_thread.hpp | 6 +- libs/core/execution_base/src/agent_ref.cpp | 4 +- libs/core/execution_base/src/this_thread.cpp | 14 ++- .../detail/hierarchical_spawning.hpp | 36 ++++++ .../executors/detail/index_queue_spawning.hpp | 103 ++++++++++++++-- .../hpx/executors/parallel_executor.hpp | 112 +++++++++--------- .../hpx/thread_pools/scheduling_loop.hpp | 102 ++++++++-------- .../hpx/threading_base/execution_agent.hpp | 2 +- .../hpx/threading_base/scheduler_base.hpp | 10 +- .../threading_base/src/execution_agent.cpp | 6 +- .../threading_base/src/scheduler_base.cpp | 68 ++++++----- .../threading_base/src/set_thread_state.cpp | 10 +- 19 files changed, 460 insertions(+), 199 deletions(-) diff --git a/cmake/HPX_AddModule.cmake b/cmake/HPX_AddModule.cmake index 33050f65ee5a..6dea81c6c4d8 100644 --- a/cmake/HPX_AddModule.cmake +++ b/cmake/HPX_AddModule.cmake @@ -69,7 +69,9 @@ function(add_hpx_module libname modulename) FORCE ) - if(${modulename}_GLOBAL_HEADER_MODULE_GEN OR ${modulename}_MODULE_SOURCE) + if(HPX_WITH_CXX_MODULES AND (${modulename}_GLOBAL_HEADER_MODULE_GEN + OR ${modulename}_MODULE_SOURCE) + ) # Mark the module as exposing C++ modules set(cxx_modules ${HPX_ENABLED_CXX_MODULES}) list(APPEND cxx_modules ${modulename}) @@ -193,7 +195,7 @@ function(add_hpx_module libname modulename) set(global_header "${CMAKE_CURRENT_BINARY_DIR}/include/hpx/modules/${modulename}.hpp" ) - if(${modulename}_GLOBAL_HEADER_MODULE_GEN) + if(HPX_WITH_CXX_MODULES AND ${modulename}_GLOBAL_HEADER_MODULE_GEN) # generate list of macro headers to #include list(LENGTH ${modulename}_MACRO_HEADERS macro_headers) if(macro_headers GREATER 0) @@ -236,7 +238,7 @@ function(add_hpx_module libname modulename) ) set(generated_headers ${global_header}) - if(${modulename}_GLOBAL_HEADER_MODULE_GEN) + if(HPX_WITH_CXX_MODULES AND ${modulename}_GLOBAL_HEADER_MODULE_GEN) # collect all standard header files used by this module set(found_includes) hpx_collect_std_headers( diff --git a/libs/core/algorithms/include/hpx/parallel/util/foreach_partitioner.hpp b/libs/core/algorithms/include/hpx/parallel/util/foreach_partitioner.hpp index cd780e736597..7a88de382aa9 100644 --- a/libs/core/algorithms/include/hpx/parallel/util/foreach_partitioner.hpp +++ b/libs/core/algorithms/include/hpx/parallel/util/foreach_partitioner.hpp @@ -69,9 +69,18 @@ namespace hpx::parallel::util::detail { auto&& shape = detail::get_bulk_iteration_shape_idx(policy, first, count); - return execution::bulk_async_execute(policy.executor(), - partitioner_iteration{HPX_FORWARD(F, f)}, - reshape(HPX_MOVE(shape))); + if constexpr (hpx::is_async_execution_policy_v) + { + return execution::bulk_async_execute(policy.executor(), + partitioner_iteration{HPX_FORWARD(F, f)}, + reshape(HPX_MOVE(shape))); + } + else + { + return execution::bulk_sync_execute(policy.executor(), + partitioner_iteration{HPX_FORWARD(F, f)}, + reshape(HPX_MOVE(shape))); + } } else { @@ -101,8 +110,8 @@ namespace hpx::parallel::util::detail { template - static decltype(auto) call(ExPolicy_&& policy, FwdIter first, - std::size_t count, F1&& f1, F2&& f2, ReShape&& reshape = ReShape{}) + static auto call(ExPolicy_&& policy, FwdIter first, std::size_t count, + F1&& f1, F2&& f2, ReShape&& reshape = ReShape{}) { // inform parameter traits using scoped_executor_parameters = @@ -115,14 +124,28 @@ namespace hpx::parallel::util::detail { FwdIter last = parallel::detail::next(first, count); try { - auto&& items = detail::foreach_partition( - HPX_FORWARD(ExPolicy_, policy), first, count, - HPX_FORWARD(F1, f1), HPX_FORWARD(ReShape, reshape)); + if constexpr (std::is_void_v( + policy, first, count, f1, reshape))>) + { + detail::foreach_partition( + HPX_FORWARD(ExPolicy_, policy), first, count, + HPX_FORWARD(F1, f1), HPX_FORWARD(ReShape, reshape)); - scoped_params.mark_end_of_scheduling(); + scoped_params.mark_end_of_scheduling(); - return reduce( - HPX_MOVE(items), HPX_FORWARD(F2, f2), HPX_MOVE(last)); + return HPX_INVOKE(f2, HPX_MOVE(last)); + } + else + { + auto&& items = foreach_partition( + HPX_FORWARD(ExPolicy_, policy), first, count, + HPX_FORWARD(F1, f1), HPX_FORWARD(ReShape, reshape)); + + scoped_params.mark_end_of_scheduling(); + + return reduce( + HPX_MOVE(items), HPX_FORWARD(F2, f2), HPX_MOVE(last)); + } } catch (...) { diff --git a/libs/core/algorithms/include/hpx/parallel/util/result_types.hpp b/libs/core/algorithms/include/hpx/parallel/util/result_types.hpp index 6dee11143c24..272dd70b5a6f 100644 --- a/libs/core/algorithms/include/hpx/parallel/util/result_types.hpp +++ b/libs/core/algorithms/include/hpx/parallel/util/result_types.hpp @@ -416,13 +416,15 @@ namespace hpx::parallel::util { namespace detail { template - in_out_result::type, + in_out_result< + typename hpx::tuple_element<0, + typename std::decay_t::iterator_tuple_type>::type, typename hpx::tuple_element<1, - typename ZipIter::iterator_tuple_type>::type> + typename std::decay_t::iterator_tuple_type>::type> get_in_out_result(ZipIter&& zipiter) { - using iterator_tuple_type = typename ZipIter::iterator_tuple_type; + using iterator_tuple_type = + typename std::decay_t::iterator_tuple_type; using result_type = in_out_result< typename hpx::tuple_element<0, iterator_tuple_type>::type, @@ -433,11 +435,7 @@ namespace hpx::parallel::util { } template - // clang-format off - requires ( - hpx::execution::experimental::is_sender_v - ) - // clang-format on + requires(hpx::execution::experimental::is_sender_v) decltype(auto) get_in_out_result(ZipIterSender&& zipiter_sender) { return hpx::execution::experimental::then( @@ -498,15 +496,17 @@ namespace hpx::parallel::util { } template - in_in_out_result::type, + in_in_out_result< + typename hpx::tuple_element<0, + typename std::decay_t::iterator_tuple_type>::type, typename hpx::tuple_element<1, - typename ZipIter::iterator_tuple_type>::type, + typename std::decay_t::iterator_tuple_type>::type, typename hpx::tuple_element<2, - typename ZipIter::iterator_tuple_type>::type> + typename std::decay_t::iterator_tuple_type>::type> get_in_in_out_result(ZipIter&& zipiter) { - using iterator_tuple_type = typename ZipIter::iterator_tuple_type; + using iterator_tuple_type = + typename std::decay_t::iterator_tuple_type; using result_type = in_in_out_result< typename hpx::tuple_element<0, iterator_tuple_type>::type, @@ -518,11 +518,7 @@ namespace hpx::parallel::util { } template - // clang-format off - requires ( - hpx::execution::experimental::is_sender_v - ) - // clang-format on + requires(hpx::execution::experimental::is_sender_v) decltype(auto) get_in_in_out_result(ZipIterSender&& zipiter_sender) { return hpx::execution::experimental::then( diff --git a/libs/core/algorithms/tests/performance/benchmark_merge.cpp b/libs/core/algorithms/tests/performance/benchmark_merge.cpp index 9ea47fb69438..ae06ef8c8cbd 100644 --- a/libs/core/algorithms/tests/performance/benchmark_merge.cpp +++ b/libs/core/algorithms/tests/performance/benchmark_merge.cpp @@ -133,6 +133,33 @@ struct hpx::execution::experimental::is_executor_parameters { }; +struct enable_fast_idle_mode +{ + template + friend void tag_override_invoke( + hpx::execution::experimental::mark_begin_execution_t, + enable_fast_idle_mode, Executor&&) + { + hpx::threads::add_scheduler_mode( + hpx::threads::policies::scheduler_mode::fast_idle_mode); + } + + template + friend void tag_override_invoke( + hpx::execution::experimental::mark_end_execution_t, + enable_fast_idle_mode, Executor&&) + { + hpx::threads::remove_scheduler_mode( + hpx::threads::policies::scheduler_mode::fast_idle_mode); + } +}; + +template <> +struct hpx::execution::experimental::is_executor_parameters< + enable_fast_idle_mode> : std::true_type +{ +}; + /////////////////////////////////////////////////////////////////////////////// template struct random_to_item_t @@ -234,7 +261,7 @@ void run_benchmark(std::size_t vector_size1, std::size_t vector_size2, double const time_seq = run_merge_benchmark_hpx( test_count, seq, first1, last1, first2, last2, dest); - hpx::this_thread::sleep_for(std::chrono::seconds(1)); + hpx::this_thread::sleep_for(std::chrono::milliseconds(200)); std::cout << "--- run_merge_benchmark_par ---" << std::endl; @@ -251,11 +278,11 @@ void run_benchmark(std::size_t vector_size1, std::size_t vector_size2, double const time_par = run_merge_benchmark_hpx( test_count, policy.with(ccs), first1, last1, first2, last2, dest); - std::cout << "--- run_merge_benchmark_par_stackless ---" << std::endl; - HPX_ITT_PAUSE(); - hpx::this_thread::sleep_for(std::chrono::seconds(1)); + std::cout << "--- run_merge_benchmark_par_stackless ---" << std::endl; + + hpx::this_thread::sleep_for(std::chrono::milliseconds(200)); HPX_ITT_RESUME(); @@ -270,6 +297,26 @@ void run_benchmark(std::size_t vector_size1, std::size_t vector_size2, HPX_ITT_PAUSE(); + std::cout << "--- run_merge_benchmark_par_stackless_fast_idle ---" + << std::endl; + + hpx::this_thread::sleep_for(std::chrono::milliseconds(200)); + + HPX_ITT_RESUME(); + + double time_par_stackless_fast_idle = 0; + { + enable_fast_idle_mode efim; + auto const stackless_policy = + hpx::execution::experimental::with_stacksize( + policy, hpx::threads::thread_stacksize::nostack); + time_par_stackless_fast_idle = run_merge_benchmark_hpx(test_count, + stackless_policy.with(ccs, efim), first1, last1, first2, last2, + dest); + } + + HPX_ITT_PAUSE(); + std::cout << "--- run_merge_benchmark_par_fork_join ---" << std::endl; double time_par_fork_join = 0; { @@ -290,6 +337,9 @@ void run_benchmark(std::size_t vector_size1, std::size_t vector_size2, hpx::util::format_to(std::cout, fmt, "par", time_par) << std::endl; hpx::util::format_to(std::cout, fmt, "par_stackless", time_par_stackless) << std::endl; + hpx::util::format_to( + std::cout, fmt, "par_stackless_fast_idle", time_par_stackless_fast_idle) + << std::endl; hpx::util::format_to(std::cout, fmt, "par_fork_join", time_par_fork_join) << std::endl; hpx::util::format_to(std::cout, fmt, "par_unseq", time_par_unseq) diff --git a/libs/core/algorithms/tests/performance/benchmark_merge_sweep.cpp b/libs/core/algorithms/tests/performance/benchmark_merge_sweep.cpp index 13ad21d5c52f..505070138f32 100644 --- a/libs/core/algorithms/tests/performance/benchmark_merge_sweep.cpp +++ b/libs/core/algorithms/tests/performance/benchmark_merge_sweep.cpp @@ -164,6 +164,33 @@ struct hpx::execution::experimental::is_executor_parameters { }; +struct enable_fast_idle_mode +{ + template + friend void tag_override_invoke( + hpx::execution::experimental::mark_begin_execution_t, + enable_fast_idle_mode, Executor&&) + { + hpx::threads::add_scheduler_mode( + hpx::threads::policies::scheduler_mode::fast_idle_mode); + } + + template + friend void tag_override_invoke( + hpx::execution::experimental::mark_end_execution_t, + enable_fast_idle_mode, Executor&&) + { + hpx::threads::remove_scheduler_mode( + hpx::threads::policies::scheduler_mode::fast_idle_mode); + } +}; + +template <> +struct hpx::execution::experimental::is_executor_parameters< + enable_fast_idle_mode> : std::true_type +{ +}; + /////////////////////////////////////////////////////////////////////////////// template struct random_to_item_t @@ -461,6 +488,11 @@ int hpx_main(hpx::program_options::variables_map& vm) run_benchmark(stackless_policy, vector_size1, vector_size2, test_count, std::random_access_iterator_tag(), alloc, "std::vector (stackless)", entropy); + + enable_fast_idle_mode efim; + run_benchmark(stackless_policy.with(efim), vector_size1, vector_size2, + test_count, std::random_access_iterator_tag(), alloc, + "std::vector (stackless, fast-idle mode)", entropy); } { @@ -481,6 +513,11 @@ int hpx_main(hpx::program_options::variables_map& vm) run_benchmark(stackless_policy, vector_size1, vector_size2, test_count, std::random_access_iterator_tag(), alloc, "hpx::compute::vector (stackless)", entropy); + + enable_fast_idle_mode efim; + run_benchmark(stackless_policy.with(efim), vector_size1, vector_size2, + test_count, std::random_access_iterator_tag(), alloc, + "hpx::compute::vector (stackless, fast-idle mode)", entropy); } return hpx::local::finalize(); diff --git a/libs/core/execution_base/include/hpx/execution_base/agent_base.hpp b/libs/core/execution_base/include/hpx/execution_base/agent_base.hpp index d595606d6071..7271bf443091 100644 --- a/libs/core/execution_base/include/hpx/execution_base/agent_base.hpp +++ b/libs/core/execution_base/include/hpx/execution_base/agent_base.hpp @@ -24,7 +24,7 @@ namespace hpx::execution_base { [[nodiscard]] virtual context_base const& context() const noexcept = 0; virtual void yield(char const* desc) = 0; - virtual void yield_k(std::size_t k, char const* desc) = 0; + virtual bool yield_k(std::size_t k, char const* desc) = 0; virtual void suspend(char const* desc) = 0; virtual void resume( hpx::threads::thread_priority priority, char const* desc) = 0; diff --git a/libs/core/execution_base/include/hpx/execution_base/agent_ref.hpp b/libs/core/execution_base/include/hpx/execution_base/agent_ref.hpp index 86067a46458a..a3748fd66752 100644 --- a/libs/core/execution_base/include/hpx/execution_base/agent_ref.hpp +++ b/libs/core/execution_base/include/hpx/execution_base/agent_ref.hpp @@ -47,7 +47,7 @@ namespace hpx::execution_base { void yield( char const* desc = "hpx::execution_base::agent_ref::yield") const; - void yield_k(std::size_t k, + bool yield_k(std::size_t k, char const* desc = "hpx::execution_base::agent_ref::yield_k") const; void suspend( char const* desc = "hpx::execution_base::agent_ref::suspend") const; diff --git a/libs/core/execution_base/include/hpx/execution_base/this_thread.hpp b/libs/core/execution_base/include/hpx/execution_base/this_thread.hpp index 1bd3189c4340..b9a19003f512 100644 --- a/libs/core/execution_base/include/hpx/execution_base/this_thread.hpp +++ b/libs/core/execution_base/include/hpx/execution_base/this_thread.hpp @@ -61,7 +61,7 @@ namespace hpx::execution_base { HPX_CORE_EXPORT void yield( char const* desc = "hpx::execution_base::this_thread::yield"); - HPX_CORE_EXPORT void yield_k(std::size_t k, + HPX_CORE_EXPORT bool yield_k(std::size_t k, char const* desc = "hpx::execution_base::this_thread::yield_k"); HPX_CORE_EXPORT void suspend( char const* desc = "hpx::execution_base::this_thread::suspend"); @@ -148,7 +148,7 @@ namespace hpx::util { namespace detail { - inline void yield_k(std::size_t k, char const* thread_name) + inline bool yield_k(std::size_t k, char const* thread_name) { #ifdef HPX_HAVE_SPINLOCK_DEADLOCK_DETECTION if (k > 32 && get_spinlock_break_on_deadlock_enabled() && @@ -158,7 +158,7 @@ namespace hpx::util { "possible deadlock detected"); } #endif - hpx::execution_base::this_thread::yield_k(k, thread_name); + return hpx::execution_base::this_thread::yield_k(k, thread_name); } } // namespace detail diff --git a/libs/core/execution_base/src/agent_ref.cpp b/libs/core/execution_base/src/agent_ref.cpp index 47ebf9d1649e..e244d9eb49c0 100644 --- a/libs/core/execution_base/src/agent_ref.cpp +++ b/libs/core/execution_base/src/agent_ref.cpp @@ -25,13 +25,13 @@ namespace hpx::execution_base { impl_->yield(desc); } - void agent_ref::yield_k(std::size_t k, const char* desc) const + bool agent_ref::yield_k(std::size_t k, const char* desc) const { HPX_ASSERT(*this == hpx::execution_base::this_thread::agent()); // verify that there are no more registered locks for this OS-thread util::verify_no_locks(); - impl_->yield_k(k, desc); + return impl_->yield_k(k, desc); } void agent_ref::suspend(const char* desc) const diff --git a/libs/core/execution_base/src/this_thread.cpp b/libs/core/execution_base/src/this_thread.cpp index 7ffd914d1467..1fc4191db54c 100644 --- a/libs/core/execution_base/src/this_thread.cpp +++ b/libs/core/execution_base/src/this_thread.cpp @@ -65,7 +65,7 @@ namespace hpx::execution_base { } void yield(char const* desc) override; - void yield_k(std::size_t k, char const* desc) override; + bool yield_k(std::size_t k, char const* desc) override; void suspend(char const* desc) override; void resume(hpx::threads::thread_priority priority, char const* desc) override; @@ -102,27 +102,32 @@ namespace hpx::execution_base { #endif } - void default_agent::yield_k(std::size_t k, char const* /* desc */) + bool default_agent::yield_k(std::size_t k, char const* /* desc */) { if (k < 4) //-V112 { + return false; } else if (k < 16) { HPX_SMT_PAUSE; + return false; } else if (k < 32 || k & 1) //-V112 { #if defined(HPX_WINDOWS) Sleep(0); + return true; #else sched_yield(); + return true; #endif } else { #if defined(HPX_WINDOWS) Sleep(1); + return true; #else // g++ -Wextra warns on {} or {0} struct timespec rqtp = {0, 0}; @@ -134,6 +139,7 @@ namespace hpx::execution_base { rqtp.tv_nsec = 1000; nanosleep(&rqtp, nullptr); + return true; #endif } } @@ -263,9 +269,9 @@ namespace hpx::execution_base { agent().yield(desc); } - void yield_k(std::size_t k, char const* desc) + bool yield_k(std::size_t k, char const* desc) { - agent().yield_k(k, desc); + return agent().yield_k(k, desc); } void suspend(char const* desc) diff --git a/libs/core/executors/include/hpx/executors/detail/hierarchical_spawning.hpp b/libs/core/executors/include/hpx/executors/detail/hierarchical_spawning.hpp index 5bde665fb8a5..282001d79d6d 100644 --- a/libs/core/executors/include/hpx/executors/detail/hierarchical_spawning.hpp +++ b/libs/core/executors/include/hpx/executors/detail/hierarchical_spawning.hpp @@ -222,6 +222,42 @@ namespace hpx::parallel::execution::detail { policy, HPX_FORWARD(F, f), shape, HPX_FORWARD(Ts, ts)...); } + template + decltype(auto) hierarchical_bulk_sync_execute( + hpx::threads::thread_description const& desc, + threads::thread_pool_base* pool, std::size_t first_thread, + std::size_t num_threads, std::size_t hierarchical_threshold, + Launch policy, F&& f, S const& shape, Ts&&... ts) + { + using result_type = detail::bulk_function_result_t; + if constexpr (!std::is_void_v) + { + return hpx::unwrap(hierarchical_bulk_async_execute_helper(desc, + pool, first_thread, num_threads, hierarchical_threshold, policy, + HPX_FORWARD(F, f), shape, HPX_FORWARD(Ts, ts)...)); + } + else + { + return hpx::unwrap(hierarchical_bulk_async_execute_void(desc, pool, + first_thread, num_threads, hierarchical_threshold, policy, + HPX_FORWARD(F, f), shape, HPX_FORWARD(Ts, ts)...)); + } + } + + template + decltype(auto) hierarchical_bulk_sync_execute( + threads::thread_pool_base* pool, std::size_t first_thread, + std::size_t num_threads, std::size_t hierarchical_threshold, + Launch policy, F&& f, S const& shape, Ts&&... ts) + { + hpx::threads::thread_description const desc( + f, "hierarchical_bulk_sync_execute"); + + return hierarchical_bulk_sync_execute(desc, pool, first_thread, + num_threads, hierarchical_threshold, policy, HPX_FORWARD(F, f), + shape, HPX_FORWARD(Ts, ts)...); + } + template decltype(auto) hierarchical_bulk_async_execute( hpx::threads::thread_description const& desc, diff --git a/libs/core/executors/include/hpx/executors/detail/index_queue_spawning.hpp b/libs/core/executors/include/hpx/executors/detail/index_queue_spawning.hpp index cecba63afa42..0594e562d2c2 100644 --- a/libs/core/executors/include/hpx/executors/detail/index_queue_spawning.hpp +++ b/libs/core/executors/include/hpx/executors/detail/index_queue_spawning.hpp @@ -256,7 +256,8 @@ namespace hpx::parallel::execution::detail { //////////////////////////////////////////////////////////////////////////// // Extend the shared state of the returned future allowing to keep alive all // data needed for the scheduling - template + template struct index_queue_bulk_state final : lcos::detail::future_data { private: @@ -298,7 +299,7 @@ namespace hpx::parallel::execution::detail { auto const& rp = hpx::resource::get_partitioner(); for (std::uint32_t i = 0; i != num_threads; ++i) { - auto thread_mask = rp.get_pu_mask( + auto const thread_mask = rp.get_pu_mask( wrapped_pu_num(i, needs_wraparound) + first_thread); for (std::uint32_t j = 0; j != overall_threads; ++j) { @@ -312,7 +313,8 @@ namespace hpx::parallel::execution::detail { } static hpx::threads::mask_type limit_mask( - hpx::threads::mask_cref_type orgmask, std::uint32_t num_threads) + hpx::threads::mask_cref_type orgmask, + std::uint32_t const num_threads) { std::uint32_t const num_cores = //-V101 hpx::threads::hardware_concurrency(); @@ -598,11 +600,31 @@ namespace hpx::parallel::execution::detail { if (main_thread_ok) { // Handle the queue for the local thread. - do_work_task(desc, pool, true, needs_wraparound, - task_function{ - hpx::intrusive_ptr(this), size, - chunk_size, local_worker_thread, reverse_placement, - allow_stealing}); + hpx::intrusive_ptr this_(this); + if constexpr (Sync) + { + // execute directly, if non-async + task_function f{HPX_MOVE(this_), + size, chunk_size, local_worker_thread, + reverse_placement, allow_stealing}; + + if (queues[local_worker_thread].data_.empty()) + { + // If the queue is empty we don't execute the task. We + // only signal that this "task" is ready. + f.finish(); + return; + } + + f(); + } + else + { + do_work_task(desc, pool, true, needs_wraparound, + task_function{HPX_MOVE(this_), + size, chunk_size, local_worker_thread, + reverse_placement, allow_stealing}); + } } } @@ -630,6 +652,69 @@ namespace hpx::parallel::execution::detail { // tasks. It also avoids an additional allocation by directly returning a // hpx::future. template + void index_queue_bulk_sync_execute_void( + hpx::threads::thread_description const& desc, + threads::thread_pool_base* pool, std::size_t first_thread, + std::size_t num_threads, Launch policy, F&& f, S const& shape, + Ts&&... ts) + { + HPX_ASSERT(pool); + + // Don't spawn tasks if there is no work to be done + if (hpx::util::begin(shape) != hpx::util::end(shape)) + { + std::size_t available_threads = pool->get_active_os_thread_count(); + using shared_state = + index_queue_bulk_state; + hpx::intrusive_ptr p( + new shared_state(first_thread, num_threads, available_threads, + HPX_MOVE(policy), HPX_FORWARD(F, f), shape, + HPX_FORWARD(Ts, ts)...), + false); + + p->execute(desc, pool); + p->wait(); + } + } + + template + decltype(auto) index_queue_bulk_sync_execute( + hpx::threads::thread_description const& desc, + threads::thread_pool_base* pool, std::size_t first_thread, + std::size_t num_threads, std::size_t hierarchical_threshold, + Launch policy, F&& f, S const& shape, Ts&&... ts) + { + using result_type = detail::bulk_function_result_t; + if constexpr (!std::is_void_v) + { + return hierarchical_bulk_sync_execute_helper(desc, pool, + first_thread, num_threads, hierarchical_threshold, policy, + HPX_FORWARD(F, f), shape, HPX_FORWARD(Ts, ts)...); + } + else + { + return index_queue_bulk_sync_execute_void(desc, pool, first_thread, + num_threads, policy, HPX_FORWARD(F, f), shape, + HPX_FORWARD(Ts, ts)...); + } + } + + template + decltype(auto) index_queue_bulk_sync_execute( + threads::thread_pool_base* pool, std::size_t first_thread, + std::size_t num_threads, std::size_t hierarchical_threshold, + Launch policy, F&& f, S const& shape, Ts&&... ts) + { + hpx::threads::thread_description const desc( + f, "hierarchical_bulk_sync_execute"); + + return index_queue_bulk_sync_execute(desc, pool, first_thread, + num_threads, hierarchical_threshold, policy, HPX_FORWARD(F, f), + shape, HPX_FORWARD(Ts, ts)...); + } + + //////////////////////////////////////////////////////////////////////////// + template decltype(auto) index_queue_bulk_async_execute_void( hpx::threads::thread_description const& desc, threads::thread_pool_base* pool, std::size_t first_thread, @@ -645,7 +730,7 @@ namespace hpx::parallel::execution::detail { } std::size_t available_threads = pool->get_active_os_thread_count(); - using shared_state = index_queue_bulk_state; + using shared_state = index_queue_bulk_state; hpx::intrusive_ptr p( new shared_state(first_thread, num_threads, available_threads, HPX_MOVE(policy), HPX_FORWARD(F, f), shape, diff --git a/libs/core/executors/include/hpx/executors/parallel_executor.hpp b/libs/core/executors/include/hpx/executors/parallel_executor.hpp index ee61a3ed5a36..a3afc0152128 100644 --- a/libs/core/executors/include/hpx/executors/parallel_executor.hpp +++ b/libs/core/executors/include/hpx/executors/parallel_executor.hpp @@ -27,7 +27,6 @@ #include #include #include -#include #include #include #include @@ -87,7 +86,7 @@ namespace hpx::execution { struct parallel_policy_executor { /// Associate the parallel_execution_tag executor tag type as a default - /// with this executor, except if the given launch policy is synch. + /// with this executor, except if the given launch policy is sync. using execution_category = std::conditional_t, sequenced_execution_tag, parallel_execution_tag>; @@ -181,12 +180,8 @@ namespace hpx::execution { // property implementations #if defined(HPX_HAVE_THREAD_DESCRIPTION) - // clang-format off - template - )> - // clang-format on + template + requires(std::is_convertible_v) friend constexpr auto tag_invoke( hpx::execution::experimental::with_annotation_t, Executor_ const& exec, char const* annotation) @@ -196,12 +191,8 @@ namespace hpx::execution { return exec_with_annotation; } - // clang-format off - template - )> - // clang-format on + template + requires(std::is_convertible_v) friend auto tag_invoke(hpx::execution::experimental::with_annotation_t, Executor_ const& exec, std::string annotation) { @@ -219,12 +210,8 @@ namespace hpx::execution { } #endif - // clang-format off - template - )> - // clang-format on + template + requires(std::is_convertible_v) friend constexpr auto tag_invoke( hpx::execution::experimental::with_processing_units_count_t, Executor_ const& exec, std::size_t num_cores) noexcept @@ -234,12 +221,8 @@ namespace hpx::execution { return exec_with_num_cores; } - // clang-format off - template - )> - // clang-format on + template + requires(hpx::traits::is_executor_parameters_v) friend constexpr std::size_t tag_invoke( hpx::execution::experimental::processing_units_count_t, Parameters&&, parallel_policy_executor const& exec, @@ -249,12 +232,8 @@ namespace hpx::execution { return exec.get_num_cores(); } - // clang-format off - template - )> - // clang-format on + template + requires(std::is_convertible_v) friend constexpr auto tag_invoke( hpx::execution::experimental::with_first_core_t, Executor_ const& exec, std::size_t first_core) noexcept @@ -424,12 +403,45 @@ namespace hpx::execution { } // BulkTwoWayExecutor interface - // clang-format off - template - )> - // clang-format on + template + requires(!std::is_integral_v) + friend decltype(auto) tag_invoke( + hpx::parallel::execution::bulk_sync_execute_t, + parallel_policy_executor const& exec, F&& f, S const& shape, + Ts&&... ts) + { +#if defined(HPX_HAVE_THREAD_DESCRIPTION) + hpx::threads::thread_description desc(f, exec.annotation_); +#else + hpx::threads::thread_description desc(f); +#endif + auto pool = exec.pool_ ? + exec.pool_ : + threads::detail::get_self_or_default_pool(); + + // use scheduling based on index_queue if no hierarchical threshold + // is given + bool const do_not_combine_tasks = + hpx::threads::do_not_combine_tasks( + exec.policy().get_hint().sharing_mode()); + + if (exec.hierarchical_threshold_ == 0 && !do_not_combine_tasks) + { + return parallel::execution::detail:: + index_queue_bulk_sync_execute(desc, pool, + exec.get_first_core(), exec.get_num_cores(), + exec.hierarchical_threshold_, exec.policy_, + HPX_FORWARD(F, f), shape, HPX_FORWARD(Ts, ts)...); + } + + return parallel::execution::detail::hierarchical_bulk_sync_execute( + desc, pool, exec.get_first_core(), exec.get_num_cores(), + exec.hierarchical_threshold_, exec.policy_, HPX_FORWARD(F, f), + shape, HPX_FORWARD(Ts, ts)...); + } + + template + requires(!std::is_integral_v) friend decltype(auto) tag_invoke( hpx::parallel::execution::bulk_async_execute_t, parallel_policy_executor const& exec, F&& f, S const& shape, @@ -465,12 +477,8 @@ namespace hpx::execution { shape, HPX_FORWARD(Ts, ts)...); } - // clang-format off - template - )> - // clang-format on + template + requires(!std::is_integral_v) friend decltype(auto) tag_invoke( hpx::parallel::execution::bulk_then_execute_t, parallel_policy_executor const& exec, F&& f, S const& shape, @@ -576,12 +584,8 @@ namespace hpx::execution { }; // support all properties exposed by the embedded policy - // clang-format off - template - )> - // clang-format on + template + requires(hpx::execution::experimental::is_scheduling_property_v) auto tag_invoke( Tag tag, parallel_policy_executor const& exec, Property&& prop) -> decltype(std::declval>().policy( @@ -594,12 +598,8 @@ namespace hpx::execution { return exec_with_prop; } - // clang-format off - template - )> - // clang-format on + template + requires(hpx::execution::experimental::is_scheduling_property_v) auto tag_invoke(Tag tag, parallel_policy_executor const& exec) -> decltype(std::declval()(std::declval())) { diff --git a/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp b/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp index e310164d1ec4..30b5ab57b0ef 100644 --- a/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp +++ b/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp @@ -528,15 +528,6 @@ namespace hpx::threads::detail { } } -#if defined(HPX_HAVE_MODULE_ASYNC_MPI) || \ - defined(HPX_HAVE_MODULE_ASYNC_CUDA) || defined(HPX_HAVE_MODULE_ASYNC_SYCL) - if (scheduler.custom_polling_function() == - policies::detail::polling_status::busy) - { - idle_loop_count = 0; - } -#endif - // something went badly wrong, give up if (HPX_UNLIKELY(this_state.load(std::memory_order_relaxed) == hpx::state::terminating)) @@ -557,7 +548,21 @@ namespace hpx::threads::detail { background_running, idle_loop_count); } } - else if (idle_loop_count > params.max_idle_loop_count_ || may_exit) + else if (idle_loop_count > params.max_idle_loop_count_) + { + idle_loop_count = 0; + + // call back into invoking context + if (!params.outer_.empty()) + { + params.outer_(); + context_storage = hpx::execution_base::this_thread::detail:: + get_agent_storage(); + } + + scheduler.SchedulingPolicy::cleanup_terminated(true); + } + else if (may_exit) { if (idle_loop_count > params.max_idle_loop_count_) idle_loop_count = 0; @@ -571,53 +576,54 @@ namespace hpx::threads::detail { } // break if we were idling after 'may_exit' - if (may_exit) - { - HPX_ASSERT(this_state.load(std::memory_order_relaxed) != - hpx::state::pre_sleep); - - if (background_thread) - { - HPX_ASSERT(background_running); - *background_running = false; - - // do background work in parcel layer and in agas - [[maybe_unused]] bool const has_exited = - call_background_thread(background_thread, next_thrd, - scheduler, num_thread, bg_work_exec_time_init, - context_storage); + HPX_ASSERT(this_state.load(std::memory_order_relaxed) != + hpx::state::pre_sleep); - // the background thread should have exited - HPX_ASSERT(has_exited); + if (background_thread) + { + HPX_ASSERT(background_running); + *background_running = false; - background_thread.reset(); - background_running.reset(); - } - else - { - bool const can_exit = !running && - scheduler.SchedulingPolicy::cleanup_terminated( - true) && - scheduler.SchedulingPolicy::get_thread_count( - thread_schedule_state::suspended, - thread_priority::default_, num_thread) == 0 && - scheduler.SchedulingPolicy::get_queue_length( - num_thread) == 0; + // do background work in parcel layer and in agas + [[maybe_unused]] bool const has_exited = + call_background_thread(background_thread, next_thrd, + scheduler, num_thread, bg_work_exec_time_init, + context_storage); - if (can_exit) - { - this_state.store(hpx::state::stopped); - break; - } - } + // the background thread should have exited + HPX_ASSERT(has_exited); - may_exit = false; + background_thread.reset(); + background_running.reset(); } else { - scheduler.SchedulingPolicy::cleanup_terminated(true); + bool const can_exit = !running && + scheduler.SchedulingPolicy::cleanup_terminated(true) && + scheduler.SchedulingPolicy::get_thread_count( + thread_schedule_state::suspended, + thread_priority::default_, num_thread) == 0 && + scheduler.SchedulingPolicy::get_queue_length( + num_thread) == 0; + + if (can_exit) + { + this_state.store(hpx::state::stopped); + break; + } } + + may_exit = false; } + +#if defined(HPX_HAVE_MODULE_ASYNC_MPI) || \ + defined(HPX_HAVE_MODULE_ASYNC_CUDA) || defined(HPX_HAVE_MODULE_ASYNC_SYCL) + if (scheduler.custom_polling_function() == + policies::detail::polling_status::busy) + { + idle_loop_count = 0; + } +#endif } } } // namespace hpx::threads::detail diff --git a/libs/core/threading_base/include/hpx/threading_base/execution_agent.hpp b/libs/core/threading_base/include/hpx/threading_base/execution_agent.hpp index 0bc34b287a6a..99d0833258fb 100644 --- a/libs/core/threading_base/include/hpx/threading_base/execution_agent.hpp +++ b/libs/core/threading_base/include/hpx/threading_base/execution_agent.hpp @@ -49,7 +49,7 @@ namespace hpx::threads { } void yield(char const* desc) override; - void yield_k(std::size_t k, char const* desc) override; + bool yield_k(std::size_t k, char const* desc) override; void suspend(char const* desc) override; void resume( hpx::threads::thread_priority priority, char const* desc) override; diff --git a/libs/core/threading_base/include/hpx/threading_base/scheduler_base.hpp b/libs/core/threading_base/include/hpx/threading_base/scheduler_base.hpp index 0974fc98245a..d09fceb06f1a 100644 --- a/libs/core/threading_base/include/hpx/threading_base/scheduler_base.hpp +++ b/libs/core/threading_base/include/hpx/threading_base/scheduler_base.hpp @@ -283,14 +283,14 @@ namespace hpx::threads::policies { #if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF) // support for suspension on idle queues - pu_mutex_type mtx_; - std::condition_variable cond_; + double max_idle_backoff_time_; struct idle_backoff_data { - std::uint32_t wait_count_; - double max_idle_backoff_time_; + pu_mutex_type wait_mtx; + std::condition_variable wait_cond; + std::uint32_t wait_count = 0; }; - std::vector> wait_counts_; + std::vector> wait_count_data_; #endif // support for suspension of pus diff --git a/libs/core/threading_base/src/execution_agent.cpp b/libs/core/threading_base/src/execution_agent.cpp index 53fac19f6a2d..a47d08fdc52e 100644 --- a/libs/core/threading_base/src/execution_agent.cpp +++ b/libs/core/threading_base/src/execution_agent.cpp @@ -62,24 +62,28 @@ namespace hpx::threads { do_yield(desc, hpx::threads::thread_schedule_state::pending); } - void execution_agent::yield_k(std::size_t k, char const* desc) + bool execution_agent::yield_k(std::size_t k, char const* desc) { if (k < 4) //-V112 { + return false; } #if defined(HPX_SMT_PAUSE) else if (k < 16) { HPX_SMT_PAUSE; + return false; } #endif else if (k < 32 || k & 1) //-V112 { do_yield(desc, hpx::threads::thread_schedule_state::pending_boost); + return true; } else { do_yield(desc, hpx::threads::thread_schedule_state::pending); + return true; } } diff --git a/libs/core/threading_base/src/scheduler_base.cpp b/libs/core/threading_base/src/scheduler_base.cpp index e28f212b7043..a1b92a699339 100644 --- a/libs/core/threading_base/src/scheduler_base.cpp +++ b/libs/core/threading_base/src/scheduler_base.cpp @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2023 Hartmut Kaiser +// Copyright (c) 2007-2025 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -39,7 +40,12 @@ namespace hpx::threads::policies { char const* description, thread_queue_init_parameters const& thread_queue_init, scheduler_mode mode) - : suspend_mtxs_(num_threads) + : mode_(mode) +#if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF) + , max_idle_backoff_time_(thread_queue_init.max_idle_backoff_time_) + , wait_count_data_(num_threads) +#endif + , suspend_mtxs_(num_threads) , suspend_conds_(num_threads) , pu_mtxs_(num_threads) , states_(num_threads) @@ -56,17 +62,6 @@ namespace hpx::threads::policies { { scheduler_base::set_scheduler_mode(mode); -#if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF) - double const max_time = thread_queue_init.max_idle_backoff_time_; - - wait_counts_.resize(num_threads); - for (auto&& data : wait_counts_) - { - data.data_.wait_count_ = 0; - data.data_.max_idle_backoff_time_ = max_time; - } -#endif - for (std::size_t i = 0; i != num_threads; ++i) states_[i].data_.store(hpx::state::initialized); } @@ -77,30 +72,32 @@ namespace hpx::threads::policies { if (mode_.data_.load(std::memory_order_relaxed) & policies::scheduler_mode::enable_idle_backoff) { +#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX) + static hpx::util::itt::event notify_event("idle_callback"); + hpx::util::itt::mark_event e(notify_event); +#endif // Put this thread to sleep for some time, additionally it gets // woken up on new work. - idle_backoff_data& data = wait_counts_[num_thread].data_; + auto& data = wait_count_data_[num_thread].data_; // Exponential back-off with a maximum sleep time. - static constexpr std::int64_t const max_exponent = - std::numeric_limits::max_exponent; + constexpr double max_exponent = + std::numeric_limits::max_exponent - 1; double const exponent = - (std::min) (static_cast(data.wait_count_), - static_cast(max_exponent - 1)); + (std::min) (static_cast(data.wait_count), max_exponent); - std::chrono::milliseconds const period( - std::lround((std::min) (data.max_idle_backoff_time_, - std::pow(2.0, exponent)))); + std::chrono::microseconds const period(std::lround( + (std::min) (max_idle_backoff_time_, std::pow(2.0, exponent)))); - ++data.wait_count_; + ++data.wait_count; - std::unique_lock l(mtx_); - if (cond_.wait_for(l, period) == //-V1089 + std::unique_lock l(data.wait_mtx); + if (data.wait_cond.wait_for(l, period) == //-V1089 std::cv_status::no_timeout) { // reset counter if thread was woken up - data.wait_count_ = 0; + data.wait_count = 0; } } #endif @@ -109,13 +106,30 @@ namespace hpx::threads::policies { /// This function gets called by the thread-manager whenever new work /// has been added, allowing the scheduler to reactivate one or more of /// possibly idling OS threads - void scheduler_base::do_some_work(std::size_t) + void scheduler_base::do_some_work([[maybe_unused]] std::size_t num_thread) { #if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF) if (mode_.data_.load(std::memory_order_relaxed) & policies::scheduler_mode::enable_idle_backoff) { - cond_.notify_all(); +#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX) + static hpx::util::itt::event notify_event("do_some_work"); + hpx::util::itt::mark_event e(notify_event); +#endif + //if (num_thread == static_cast(-1)) + //{ + auto const size = wait_count_data_.size(); + for (std::size_t i = 0; i != size; ++i) + { + wait_count_data_[i].data_.wait_count = 0; + wait_count_data_[i].data_.wait_cond.notify_one(); + } + //} + //else + //{ + // wait_count_data_[num_thread].data_.wait_count = 0; + // wait_count_data_[num_thread].data_.wait_cond.notify_one(); + //} } #endif } diff --git a/libs/core/threading_base/src/set_thread_state.cpp b/libs/core/threading_base/src/set_thread_state.cpp index b88d9b552544..871573390eb4 100644 --- a/libs/core/threading_base/src/set_thread_state.cpp +++ b/libs/core/threading_base/src/set_thread_state.cpp @@ -100,8 +100,7 @@ namespace hpx::threads::detail { thread_state set_thread_state(thread_id_type const& thrd, thread_schedule_state const new_state, thread_restart_state const new_state_ex, thread_priority const priority, - thread_schedule_hint schedulehint, bool const retry_on_active, - error_code& ec) + thread_schedule_hint schedulehint, bool retry_on_active, error_code& ec) { if (HPX_UNLIKELY(!thrd)) { @@ -158,8 +157,11 @@ namespace hpx::threads::detail { priority, previous_state, schedulehint, ec); } - hpx::execution_base::this_thread::yield_k( - k, "hpx::threads::detail::set_thread_state"); + if (hpx::execution_base::this_thread::yield_k( + k, "hpx::threads::detail::set_thread_state")) + { + retry_on_active = true; // don't wait too long + } ++k; LTM_(warning).format( From a29b1c422813f75539f631ddd677439adbef1a29 Mon Sep 17 00:00:00 2001 From: Panos Syskakis Date: Mon, 3 Nov 2025 16:47:30 -0600 Subject: [PATCH 2/2] Prevent staggler thread from blocking progress in index_queue_spawning --- .../executors/detail/index_queue_spawning.hpp | 33 +++++++++++++------ 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/libs/core/executors/include/hpx/executors/detail/index_queue_spawning.hpp b/libs/core/executors/include/hpx/executors/detail/index_queue_spawning.hpp index 0594e562d2c2..622818ececcc 100644 --- a/libs/core/executors/include/hpx/executors/detail/index_queue_spawning.hpp +++ b/libs/core/executors/include/hpx/executors/detail/index_queue_spawning.hpp @@ -111,8 +111,9 @@ namespace hpx::parallel::execution::detail { // Perform the work in one element indexed by index. The index // represents a range of indices (iterators) in the given shape. + // Returns the number of processed shape elements. template - void do_work() const + std::uint32_t do_work() const { // explicitly copy function (object) and arguments auto f = state->f; @@ -122,6 +123,7 @@ namespace hpx::parallel::execution::detail { hpx::detail::fused_index_pack_t; hpx::optional index; + std::uint32_t processed = 0; // Handle local queue first auto& local_queue = state->queues[worker_thread].data_; @@ -140,6 +142,7 @@ namespace hpx::parallel::execution::detail { { bulk_invoke_helper(index_pack_type{}, f, *it, ts); } + ++processed; } if (allow_stealing) @@ -175,9 +178,11 @@ namespace hpx::parallel::execution::detail { { bulk_invoke_helper(index_pack_type{}, f, *it, ts); } + ++processed; } } } + return processed; } // Store an exception and mark that an exception was thrown in the @@ -194,9 +199,13 @@ namespace hpx::parallel::execution::detail { // thread to finish, it will only decrement the counter. If it is the // last thread it will call set_exception if there is an exception. // Otherwise, it will call set_value on the shared state. - void finish() const noexcept + void finish(std::uint32_t processed) const noexcept { - if (--(state->tasks_remaining.data_) == 0) + // Subtract how many chunks have been processed by this thread + std::uint32_t prev_value = state->tasks_remaining.data_.fetch_sub( + processed, std::memory_order_acq_rel); + bool const is_last = (prev_value != 0 && prev_value == processed); + if (is_last) { if (state->bad_alloc_thrown.load(std::memory_order_relaxed)) { @@ -227,17 +236,20 @@ namespace hpx::parallel::execution::detail { // on the shared state. void operator()() const noexcept { + std::uint32_t processed = 0; try { // Execute task function if (reverse_placement) { // schedule chunks from the end, if needed - do_work(); + processed = do_work< + hpx::concurrency::detail::queue_end::right>(); } else { - do_work(); + processed = do_work< + hpx::concurrency::detail::queue_end::left>(); } } catch (std::bad_alloc const&) @@ -249,7 +261,7 @@ namespace hpx::parallel::execution::detail { store_exception(std::current_exception()); } - finish(); + finish(processed); } }; @@ -387,7 +399,7 @@ namespace hpx::parallel::execution::detail { { // If the queue is empty we don't spawn a task. We only signal // that this "task" is ready. - task_f.finish(); + task_f.finish(0); return; } @@ -459,7 +471,8 @@ namespace hpx::parallel::execution::detail { , pu_mask(full_mask()) , queues(num_threads) { - tasks_remaining.data_.store(num_threads, std::memory_order_relaxed); + auto const size = static_cast(hpx::util::size(shape)); + tasks_remaining.data_.store(size, std::memory_order_relaxed); HPX_ASSERT(hpx::threads::count(pu_mask) == available_threads); } @@ -485,7 +498,7 @@ namespace hpx::parallel::execution::detail { if (num_chunks < static_cast(num_threads)) { num_threads = num_chunks; - tasks_remaining.data_ = num_chunks; + //tasks_remaining.data_ = num_chunks; pu_mask = limit_mask(pu_mask, num_chunks); available_threads = (std::min) (available_threads, num_threads); @@ -612,7 +625,7 @@ namespace hpx::parallel::execution::detail { { // If the queue is empty we don't execute the task. We // only signal that this "task" is ready. - f.finish(); + f.finish(0); return; }