Skip to content

Conversation

@charan-003
Copy link
Contributor

@charan-003 charan-003 commented Mar 27, 2025

Implement P2079R7 parallel_scheduler in HPX Core

This issue tracks the implementation of hpx::execution::experimental::parallel_scheduler in HPX core, aligning with P2079R7 (Parallel scheduler, Section 4.1) by wrapping thread_pool_policy_scheduler with hpx::launch policies. The goal is to provide a standards-compliant parallel scheduler with global access, cancellation support, task chaining, and bulk operations.

Implementation Checklist

  • Phase 1: Core Implementation

  • Implement hpx::execution::experimental::parallel_scheduler class per P2079R7 Section 4.1.

  • Add schedule() returning parallel_sender with sender_concept and completion_signatures:

  • Supports set_value_t(), set_stopped_t(), set_error_t(std::exception_ptr).

  • Wrap thread_pool_policy_scheduler with hpx::launch (async/sync policies).

  • Implement noexcept move/copy constructors and assignment operators.

  • Add operator== returning true for scheduler comparison.

  • Return forward_progress_guarantee::parallel via get_forward_progress_guarantee_t.

  • Implement then operation for task chaining with parallel_sender.

  • Code to rely on STDEXEC.

  • Phase 2: Bulk Support

  • Extend bulk_t customization to delegate to thread_pool_scheduler_bulk.

  • Implement bulk_sender for parallel bulk execution per P2079R7.

  • Support bulk_chunked and bulk_unchunked operations.

  • Phase 3: Replaceability API

  • Implement std::execution::system_context_replaceability namespace per P2079R7 Section 4.2.

  • Add query_parallel_scheduler_backend() returning shared_ptr<parallel_scheduler>.

  • Support link-time replaceability using weak symbols.

  • Implement receiver and bulk_item_receiver interfaces for frontend-backend interaction.

  • Ensure storage handling for scheduling operations.

@StellarBot
Copy link

Can one of the admins verify this patch?

@codacy-production
Copy link

codacy-production bot commented Mar 28, 2025

Coverage summary from Codacy

See diff coverage on Codacy

Coverage variation Diff coverage
+0.45% 94.84%
Coverage variation details
Coverable lines Covered lines Coverage
Common ancestor commit (6e3a98b) 193599 164650 85.05%
Head commit (1267582) 214837 (+21238) 183682 (+19032) 85.50% (+0.45%)

Coverage variation is the difference between the coverage for the head and common ancestor commits of the pull request branch: <coverage of head commit> - <coverage of common ancestor commit>

Diff coverage details
Coverable lines Covered lines Diff coverage
Pull request (#6655) 155 147 94.84%

Diff coverage is the percentage of lines that are covered by tests out of the coverable lines that the pull request added or modified: <covered lines added or modified>/<coverable lines added or modified> * 100%

See your quality gate settings    Change summary preferences

@hkaiser hkaiser added type: enhancement type: compatibility issue category: senders/receivers Implementations of the p0443r14 / p2300 + p1897 proposals labels Mar 28, 2025
Copy link
Contributor Author

@charan-003 charan-003 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clang format

@charan-003
Copy link
Contributor Author

Update:

  • Moved parallel_scheduler, parallel_sender, and bulk_sender to a new header: hpx/executors/parallel_scheduler.hpp
  • Removed related code from thread_pool_scheduler.hpp

@hkaiser — ready for re-review when convenient. Thanks for the guidance!

@charan-003
Copy link
Contributor Author

@hkaiser Can you please verify it now? I'm not sure why those 2 tests are failing.

@charan-003 charan-003 marked this pull request as ready for review April 1, 2025 19:46
Copy link
Member

@hkaiser hkaiser left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, I'm impressed! I have a couple of minor comments, though.


# Default location is $HPX_ROOT/libs/executors/include
set(executors_headers
hpx/executors/parallel_scheduler.hpp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind retaining an alphabetical order of the files?

: scheduler(HPX_FORWARD(S, s))
, receiver(r)
{
// std::cout << "Operation state created" << std::endl;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove commented code

parallel_scheduler(const parallel_scheduler&) noexcept = default;
parallel_scheduler(parallel_scheduler&&) noexcept = default;
parallel_scheduler& operator=(
const parallel_scheduler&) noexcept = default;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use east const syntax everywhere.

hpx::detail::try_catch_exception_ptr(
[&]() {
thread_pool_scheduler exec{os.scheduler.get_thread_pool()};
for (Shape i = 0; i < os.shape; ++i)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the parallel_scheduler supposed to run the tasks concurrently?

Copy link
Member

@hkaiser hkaiser Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, you should be able to rely on the bulk implementation for the thread_pool_scheduler here: https://github.com/STEllAR-GROUP/hpx/blob/master/libs/core/executors/include/hpx/executors/thread_pool_scheduler_bulk.hpp

return lhs.pool_ == rhs.pool_;
}

hpx::threads::thread_pool_base* get_thread_pool() const noexcept
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
hpx::threads::thread_pool_base* get_thread_pool() const noexcept
constexpr hpx::threads::thread_pool_base* get_thread_pool() const noexcept

hpx::execution::experimental::set_stopped_t()>;

template <typename Env>
friend auto tag_invoke(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
friend auto tag_invoke(
friend constexpr auto tag_invoke(

hpx::execution::experimental::set_stopped_t()>;

template <typename Env>
friend auto tag_invoke(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
friend auto tag_invoke(
friend constexpr auto tag_invoke(

{
exec.execute([i, &os]() mutable {
// std::cout << "Bulk task executing for index: " << i <<;
os.f(i);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
os.f(i);
HPX_INVOKE(os.f, i);

exec.execute([i, &os]() mutable {
// std::cout << "Bulk task executing for index: " << i <<;
os.f(i);
if (--(*os.tasks_remaining) == 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using std::atomic_ref instead of a std::shared_ptr would be preferrable (at least for C++20)

}

template <typename Shape, typename F>
friend auto tag_invoke(bulk_t, parallel_sender&& s, Shape shape, F&& f)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For bulk, we have implemented an extension allowing to pass an arbitrary range as the Shape (in addition to the integral value), see here, for instance: https://github.com/STEllAR-GROUP/hpx/blob/master/libs/core/execution/include/hpx/execution/algorithms/bulk.hpp#L273-L300. It might be a good idea to support this here as well.

@hkaiser
Copy link
Member

hkaiser commented Apr 2, 2025

Also, you may want to move all types that are don't have to visible by users into namespace detail to avoid name clashes with other code.

@charan-003
Copy link
Contributor Author

Also, you may want to move all types that are don't have to visible by users into namespace detail to avoid name clashes with other code.

Sure, I'll do that soon
Thank you so much

@charan-003
Copy link
Contributor Author

i'm not sure why it's failing for clang_format. i used clang-format -i ` to make sure it's in the format

@charan-003
Copy link
Contributor Author

@hkaiser
The parallel_bulk_operation_state implements a custom chunking strategy for bulk operations. It divides the workload into chunks based on the number of threads in the underlying thread_pool_scheduler’s pool (get_os_thread_count()), then schedules each chunk as a separate task using exec.execute()

The custom bulk chunking might duplicate functionality in thread_pool_scheduler_bulk.hpp

Offers a parallel execution option that leverages HPX’s thread pool, maintaining high performance for CPU-bound tasks.

While this version lacks P2079R7’s replaceability API (section 4.2) and many more, it could be extended to support it ( query_parallel_scheduler_backend()). This would let HPX users swap in custom schedulers (integrating with GPUs or other runtimes like oneTBB), enhancing HPX’s flexibility and composability

will try to work on this :))

@codacy-production
Copy link

codacy-production bot commented Apr 3, 2025

Coverage summary from Codacy

See diff coverage on Codacy

Coverage variation Diff coverage
-0.30% 88.81%
Coverage variation details
Coverable lines Covered lines Coverage
Common ancestor commit (15afc8b) 244963 210174 85.80%
Head commit (7025dfd) 219167 (-25796) 187374 (-22800) 85.49% (-0.30%)

Coverage variation is the difference between the coverage for the head and common ancestor commits of the pull request branch: <coverage of head commit> - <coverage of common ancestor commit>

Diff coverage details
Coverable lines Covered lines Diff coverage
Pull request (#6655) 268 238 88.81%

Diff coverage is the percentage of lines that are covered by tests out of the coverable lines that the pull request added or modified: <covered lines added or modified>/<coverable lines added or modified> * 100%

See your quality gate settings    Change summary preferences

@hkaiser
Copy link
Member

hkaiser commented Apr 4, 2025

i'm not sure why it's failing for clang_format. i used clang-format -i ` to make sure it's in the format

The CI uses an older version of clang-format that does not always agree with newer versions on how to format things. Simply surround the offending lines with

// clang-format off
    ....
// clang-format on

to disable it checking those.

Comment on lines 197 to 232
for (std::size_t t = 0; t < num_threads; ++t)
{
std::size_t start = t * chunk_size;
std::size_t end =
(std::min)(start + chunk_size, os.size);
if (start >= os.size)
break;

exec.execute([start, end, &os]() mutable {
if constexpr (std::is_integral_v<Shape>)
{
for (std::size_t i = start; i < end; ++i)
{
HPX_INVOKE(os.f, static_cast<Shape>(i));
}
}
else
{
// clang-format off
auto it = std::next(
hpx::util::begin(os.shape), start);
// clang-format off
for (std::size_t i = start; i < end;
++i, ++it)
// clang-format on
{
HPX_INVOKE(os.f, *it);
} // clang-format on
}
if (--(*os.tasks_remaining) == 0)
{
hpx::execution::experimental::set_value(
os.receiver);
}
});
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will still execute the tasks sequentially and not concurrently.

As I said before, wouldn't we be able to simply create a type alias using parallel_scheduler = thread_pool_scheduler; instead of trying to re-implement all of it?

If there is an API difference between parallel_scheduler and the existing thread_pool_scheduler (beyond the name), or if there is a semantic difference, please highlight that. In that case a simple wrapper may still be feasible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hkaiser
Thanks for your feedback!. I’ve updated it to use hpx::async, hpx::future, and hpx::when_all in default_parallel_scheduler to ensure concurrent execution of bulk tasks (chunks run in parallel via HPX’s thread pool), addressing your comment about sequential execution. Single tasks also use hpx::async for lightweight scheduling.

Is it okay to rely on hpx::async, hpx::future, and hpx::when_all for task scheduling and execution, given they’re part of HPX’s lightweight threading system? Or would you recommend something else?

@charan-003 charan-003 marked this pull request as draft April 8, 2025 09:20
@charan-003
Copy link
Contributor Author

charan-003 commented Apr 14, 2025

@hkaiser @isidorostsa
This version is about adding P2079R7 parallel_scheduler to HPX core

Implement parallel_scheduler wrapping thread_pool_policy_schedulerhpx::launch, aligning with P2079R7 user-facing API:

  • Add get_parallel_scheduler() for global access with async/sync policy selection.
  • Include parallel_sender with set_value_t(), set_stopped_t(), set_error_t(std::exception_ptr).
  • Support cancellation via stop token in operation_state::start_t.
  • Ensure parallel forward progress guarantee.
  • Add then operation for task chaining.
  • Add tests for progress, single task, cancellation, and exceptions.

replaceability API , bulk operations is the next thing i will keep working on.
Thank you for your support

@charan-003 charan-003 marked this pull request as ready for review April 16, 2025 12:45
#include <exception>
#include <utility>

// Forward declarations for execution::experimental
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are those necessary?

Copy link
Contributor Author

@charan-003 charan-003 Apr 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, those includes are necessary <exception> provided the std::exception_ptr for error handling.
<utility> gives std::move and std::forward and also forward declarations. I used this to prevent circular dependency while allowing type usage

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I apologize, I meant the forward declarations declared in the comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, it's not that necessary. just a declaration

}

friend void tag_invoke(start_t, operation_state& op) noexcept
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm impressed that you are using tag_invoke correctly! However, P2300 has elected not to use tag_invoke anywhere, so you should make this a member function, and do the same for connect, then etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, @hkaiser, do you think we should follow along P2300 and avoid tag_invoke for future senders, or use it anyways for backwards compatibility with our stuff (not sure if there are any real issues)

Copy link
Contributor Author

@charan-003 charan-003 Apr 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, but we were using tag_invoke in thread_pool_scheduler.hpp as well so i used it here.

this framework relies on tag_invoke> for operations like schedule. if this is not how we want it, i can update this such that it is a member function.

@hkaiser any inputs on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, we need to switch to the non-tag_invoke implementation. Not sure if it's possible to do piecewise, though.


#ifndef HPX_EXECUTION_EXPERIMENTAL_SENDER_T
#define HPX_EXECUTION_EXPERIMENTAL_SENDER_T
struct sender_t
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this necessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also please help me understand where the HPX_EXECUTION_EXPERIMENTAL_SENDER_T directive is coming from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the #ifndef HPX_EXECUTION_EXPERIMENTAL_SENDER_T prevents redefinition of sender_t, which marks sender types. it's a custom guard I defined to ensure sender_t is only defined once, keeping dependencies minimal

Copy link
Contributor

@isidorostsa isidorostsa Apr 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an easier way to prevent redefinition is to define the tag in a centralized header that will be included in the translation units that use it. For example sender.hpp.

However, you might have observed the #if defined(HPX_HAVE_STDEXEC) clauses around HPX. We have stopped developing the sender/receiver primitives in HPX and have chosen to use the ones provided by the reference stdexec implementation provided by NVIDIA. This would include the sender_t, etc.

You can build HPX with STDEXEC on linux by defining the following build flags:

-DHPX_WITH_CXX_STANDARD=20
-DHPX_WITH_STDEXEC=ON

I apologize this was not communicated to you earlier

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@isidorostsa just want to confirm if code to rely on STDEXEC

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it should. Eventually this will be replaced with the std native implementation

friend void tag_invoke(
ex::set_error_t, test_receiver&& r, std::exception_ptr ep) noexcept
{
(void) ep;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks interesting, what is the reason behind it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this ensures errors (via std::exception_ptr) are passed to the receiver when a task fails. (void) ep; avoids an unused parameter warning since we mark the errors.

Copy link
Member

@hkaiser hkaiser Apr 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just FWIW, instead of suppressing warnings by writing (void) ep; we usually use [[maybe_unused]] std::exception_ptr ep

#endif

template <typename Scheduler>
struct parallel_sender;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this maybe be nested inside the scheduler?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I'm not sure I understand why the scheduler needs to be templated. Would we instantiate it with anything but parallel_scheduler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

umm, i'm keeping sender types separate for flexibility as P2079 is still being updated and maybe reused across scheduler types.

@charan-003
Copy link
Contributor Author

@hkaiser @isidorostsa this version is using stdexec

I'm not sure of the failing test case, any hint on the failing cases ?

@charan-003
Copy link
Contributor Author

@hkaiser @isidorostsa
Does the current implementation, now relying on stdexec, meet our expectations? If so, I’ll proceed with implementing bulk support. Please confirm.

@charan-003 charan-003 force-pushed the p2079-section-4.1 branch from ba47be6 to 4e5c18c Compare July 9, 2025 00:23
Signed-off-by: Sai Charan Arvapally <[email protected]>
@charan-003 charan-003 force-pushed the p2079-section-4.1 branch 2 times, most recently from 6814725 to 25416ac Compare July 9, 2025 01:52
@hkaiser
Copy link
Member

hkaiser commented Aug 23, 2025

@charan-003 please rebase your branch onto top of master to resolve the merge conflicts.

@charan-003 charan-003 marked this pull request as draft August 23, 2025 17:18
@codacy-production
Copy link

codacy-production bot commented Aug 23, 2025

Coverage summary from Codacy

See diff coverage on Codacy

Coverage variation Diff coverage
-86.27%
Coverage variation details
Coverable lines Covered lines Coverage
Common ancestor commit (4633b7d) 263487 227318 86.27%
Head commit (161dfde) 60605 (-202882) 0 (-227318) 0.00% (-86.27%)

Coverage variation is the difference between the coverage for the head and common ancestor commits of the pull request branch: <coverage of head commit> - <coverage of common ancestor commit>

Diff coverage details
Coverable lines Covered lines Diff coverage
Pull request (#6655) 0 0 ∅ (not applicable)

Diff coverage is the percentage of lines that are covered by tests out of the coverable lines that the pull request added or modified: <covered lines added or modified>/<coverable lines added or modified> * 100%

See your quality gate settings    Change summary preferences

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

category: senders/receivers Implementations of the p0443r14 / p2300 + p1897 proposals type: compatibility issue type: enhancement

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants