Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
8b6714d
docs: Fix spelling in example dictionary
Mar 13, 2025
af52bcf
Updated config
Mar 13, 2025
51f8347
Revert "docs: Fix spelling in example dictionary"
Mar 24, 2025
4f6b1d3
Updated Config
Mar 24, 2025
bfa3444
Updated to check for all files
Mar 26, 2025
de86694
Update README.rst
hkaiser Mar 26, 2025
60b6256
parallel_scheduler for P2079 Section 4.1 User Facing API (ref #6601)
Mar 27, 2025
658835f
Fix formatting, resolve bulk_t include, and add test
Mar 28, 2025
ed18115
Fix include path to use hpx/execution/algorithms/bulk.hpp
Mar 30, 2025
3c11b9b
Update thread_pool_scheduler.hpp
charan-003 Mar 30, 2025
0bc8d53
Add null check and runtime init to parallel_scheduler to debug Circle…
Mar 31, 2025
02ececd
Refactor: move parallel_scheduler to separate header
Mar 31, 2025
345a957
Fix CircleCI errors: add parallel_scheduler.hpp to CMakeLists, apply …
Mar 31, 2025
8e161d1
Update parallel_scheduler.hpp includes
Mar 31, 2025
bbd6708
Fixed inspect and test.headers
Mar 31, 2025
45d6340
Update parallel_scheduler.hpp
Apr 1, 2025
df9130c
Update parallel_scheduler.hpp
Apr 1, 2025
9c31037
Update parallel_scheduler.hpp
Apr 1, 2025
23a00dd
use east const, make get_thread_pool constexpr, improve bulk concurre…
Apr 2, 2025
e9582eb
Add p2079 parallel_scheduler implementation to HPX
Apr 3, 2025
cb1383b
Fixed Clang_format
Apr 3, 2025
54b6648
Fixed Clang_format
Apr 3, 2025
b906b22
Fixed Clang format
Apr 3, 2025
9639ec5
Fixed Clang format
Apr 4, 2025
6ac6d57
Update parallel_scheduler.hpp
charan-003 Apr 4, 2025
b6b49c3
Fixed Clang format by disabling
Apr 4, 2025
e27a582
Implemented parallel_scheduler with HPX async:
Apr 8, 2025
6beec67
fix circular desp
Apr 8, 2025
ae7c560
/
Apr 8, 2025
ea447ab
Implement P2079R7 parallel_scheduler in HPX using wrapper
Apr 13, 2025
131de3b
Implement P2079R7 parallel_scheduler in HPX using wrapper
Apr 13, 2025
f20e1ec
fixing circular deps
Apr 14, 2025
c18a13c
fixed cmakelist models
Apr 14, 2025
4625086
added missing includes
Apr 14, 2025
9825fd6
formating the file
Apr 14, 2025
5ef544f
emoving the forward declaration of hpx::get_num_worker_threads()
Apr 14, 2025
fc6ab4c
fix format
Apr 14, 2025
fd3085b
Update code to rely on STDEXEC
Apr 28, 2025
4359d16
Add HPX_WITH_STDEXEC=ON
Apr 29, 2025
d0ad0dc
use STDEXEC
Apr 29, 2025
523902f
Add conditional stdexec support in parallel_scheduler.hpp
Apr 29, 2025
69ab80e
apply clang format
Apr 30, 2025
55b691e
Update
May 1, 2025
4eed5ba
exceptions removed
May 8, 2025
94403fc
revert guided_pool_executor.hpp
May 11, 2025
a7569b5
Merge branch 'STEllAR-GROUP:master' into p2079-section-4.1
charan-003 May 26, 2025
8752188
update thread_pool_scheduler implementing p2079
May 26, 2025
d2a017f
update thread_pool_scheduler implementing p2079
May 26, 2025
7d23ec2
update thread_pool_scheduler implementing p2079
May 27, 2025
cf3e025
git push origin p2079-section-4.1added bulk_unchunked functionality t…
Jun 10, 2025
fcd4100
test unchunked
Jun 10, 2025
cc07053
fixing parallel_scheduler
Jun 11, 2025
b80264f
fixing inplace_stop_token
Jun 11, 2025
92f0d64
User-Supplied Function Interface
Jun 17, 2025
158fc54
fix queue_depth error
Jun 17, 2025
47eb7a4
fixed thrad_pool_scheduler
Jun 18, 2025
530e6ea
files mismatched
Jun 18, 2025
5e32ee2
files mismatched
Jun 18, 2025
911ed88
change
Jun 18, 2025
276d215
missing includes
Jun 21, 2025
ac53aed
implemented user_facing_api in thread_pool_scheduler
Jun 29, 2025
8fab595
implemented user_facing_api in thread_pool_scheduler
Jun 29, 2025
667a4ba
fixing errors in thread_pool_scheduler
Jun 30, 2025
ef753c1
fixing compilation errors in thread_pool_scheduler
Jul 2, 2025
9c97818
implemented user_facing_api in thread_pool_scheduler
Jul 2, 2025
e77fdef
user facing api in HPX
Jul 2, 2025
df1b71c
user facing api in HPX
Jul 3, 2025
9b7fc2b
fix user facing api in HPX
Jul 3, 2025
bdd71b3
fixing test errors
Jul 5, 2025
90fc6cf
fixing test errors
Jul 5, 2025
25478f2
fixing errors
Jul 6, 2025
77ccc80
fixing errors
Jul 6, 2025
8bc5e66
fixing errors
Jul 6, 2025
56d0bc8
fixing errors
Jul 6, 2025
78c0e4e
fixing errors
Jul 6, 2025
d0a80b7
fixing errors
Jul 6, 2025
97c7a75
fixing errors in test file
Jul 7, 2025
b911100
fixing errors in test file
Jul 8, 2025
4e5c18c
adding getter function
Jul 8, 2025
25416ac
adding getter function
charan-003 Jul 9, 2025
d29c834
using launch_policy
charan-003 Jul 9, 2025
873d706
using launch_policy_1
charan-003 Jul 9, 2025
7025dfd
implementation of bulk_chunk_1
charan-003 Jul 11, 2025
161dfde
Merge remote-tracking branch 'upstream/master' into p2079-section-4.1
Aug 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/sphinx/releases/whats_new_1_9_0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ Closed pull requests
* :hpx-pr:`5985` - Re-add deprecated tag_policy_tag et.al. types that were removed in V1.8.1
* :hpx-pr:`5981` - docs: add docs for condition_variable.hpp
* :hpx-pr:`5980` - More work on execution::read
* :hpx-pr:`5979` - Remove support for clang-v8 and clang-v9, switch LSU clang-v13 to C++17
* :hpx-pr:`5979` - Unsupported clang-v8 and clang-v9, switch LSU clang-v13 to C++17
* :hpx-pr:`5977` - fix: Compilation errors for -std=c++17 builders
* :hpx-pr:`5975` - docs: fix & improve parallel algorithms documentation 5
* :hpx-pr:`5974` - [P2300] Adapt get completion signatures for awaitable senders
Expand Down
1 change: 1 addition & 0 deletions libs/core/executors/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ set(executors_headers
hpx/executors/limiting_executor.hpp
hpx/executors/parallel_executor_aggregated.hpp
hpx/executors/parallel_executor.hpp
hpx/executors/parallel_scheduler.hpp
hpx/executors/post.hpp
hpx/executors/restricted_thread_pool_executor.hpp
hpx/executors/scheduler_executor.hpp
Expand Down
270 changes: 270 additions & 0 deletions libs/core/executors/include/hpx/executors/parallel_scheduler.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
// Copyright (c) 2025 Sai Charan Arvapally
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#pragma once

#include <hpx/async_base/launch_policy.hpp>
#include <hpx/execution_base/stdexec_forward.hpp>
#include <hpx/executors/thread_pool_scheduler.hpp>
#include <hpx/executors/thread_pool_scheduler_bulk.hpp> // Added for P2079R10 compliance: include bulk_t
#include <hpx/threading_base/detail/get_default_pool.hpp>
#include <exception>
#include <memory>

#if !defined(HPX_HAVE_STDEXEC)
#include <hpx/execution/queries/get_stop_token.hpp>
#include <hpx/synchronization/stop_token.hpp>
#endif

namespace hpx::execution::experimental {

// Added for P2079R10 compliance: bulk_chunked_t tag
struct bulk_chunked_t
{
};

namespace detail {
// Singleton-like shared thread pool for parallel_scheduler
inline hpx::threads::thread_pool_base* get_default_parallel_pool()
{
// clang-format off
static hpx::threads::thread_pool_base* default_pool =
hpx::threads::detail::get_self_or_default_pool();
// clang-format on
return default_pool;
}
} // namespace detail

// Forward declarations
class parallel_scheduler;
struct parallel_scheduler_sender;

// P2079R10 parallel_scheduler implementation
class parallel_scheduler
{
public:
// Deleted default constructor
parallel_scheduler() = delete;

// Constructor from thread_pool_policy_scheduler
explicit parallel_scheduler(
thread_pool_policy_scheduler<hpx::launch> sched) noexcept
: scheduler_(sched)
{
}

// Copy constructor
parallel_scheduler(parallel_scheduler const& other) noexcept
: scheduler_(other.scheduler_)
{
}

// Move constructor
parallel_scheduler(parallel_scheduler&& other) noexcept
: scheduler_(HPX_MOVE(other.scheduler_))
{
}

// Copy assignment
parallel_scheduler& operator=(parallel_scheduler const& other) noexcept
{
if (this != &other)
{
scheduler_ = other.scheduler_;
}
return *this;
}

// Move assignment
parallel_scheduler& operator=(parallel_scheduler&& other) noexcept
{
if (this != &other)
{
scheduler_ = HPX_MOVE(other.scheduler_);
}
return *this;
}

// Equality comparison
friend constexpr bool operator==(parallel_scheduler const& lhs,
parallel_scheduler const& rhs) noexcept
{
return lhs.scheduler_ == rhs.scheduler_;
}

// Query for forward progress guarantee
friend constexpr forward_progress_guarantee tag_invoke(
get_forward_progress_guarantee_t,
[[maybe_unused]] parallel_scheduler const&) noexcept
{
return forward_progress_guarantee::parallel;
}

// Schedule method returning a sender
friend parallel_scheduler_sender tag_invoke(
schedule_t, parallel_scheduler const& sched) noexcept;

// Support get_completion_scheduler for scheduler concept
template <typename CPO>
friend auto tag_invoke(get_completion_scheduler_t<CPO>,
[[maybe_unused]] parallel_scheduler const& sched) noexcept
-> std::enable_if_t<hpx::meta::value<hpx::meta::one_of<CPO,
set_value_t, set_stopped_t>>,
parallel_scheduler const&>
{
return sched;
}

// Friend declaration to allow parallel_scheduler_sender access
friend struct parallel_scheduler_sender;

// Public getter for the underlying scheduler
thread_pool_policy_scheduler<hpx::launch> const&
get_underlying_scheduler() const noexcept
{
return scheduler_;
}

private:
thread_pool_policy_scheduler<hpx::launch> scheduler_;
};

// Sender for parallel_scheduler
struct parallel_scheduler_sender
{
parallel_scheduler scheduler;
#if defined(HPX_HAVE_STDEXEC)
using sender_concept = hpx::execution::experimental::sender_t;
using completion_signatures =
hpx::execution::experimental::completion_signatures<
hpx::execution::experimental::set_value_t(),
hpx::execution::experimental::set_error_t(std::exception_ptr),
hpx::execution::experimental::set_stopped_t()>;

template <typename Env>
friend auto tag_invoke(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
friend auto tag_invoke(
friend constexpr auto tag_invoke(

hpx::execution::experimental::get_completion_signatures_t,
parallel_scheduler_sender const&, Env) noexcept
-> completion_signatures;
#else
// Fallback types when stdexec is not available
using sender_concept = void; // Minimal fallback to allow compilation
struct completion_signatures
{
}; // Empty struct as placeholder
#endif

template <typename Receiver>
friend auto tag_invoke(
connect_t, parallel_scheduler_sender const& s, Receiver&& receiver)
{
// clang-format off
return thread_pool_policy_scheduler<hpx::launch>::
operation_state<
thread_pool_policy_scheduler<hpx::launch>,
Receiver>{
s.scheduler.get_underlying_scheduler(),
HPX_FORWARD(Receiver, receiver)};
// clang-format on
}

template <typename Receiver>
friend auto tag_invoke(
connect_t, parallel_scheduler_sender&& s, Receiver&& receiver)
{
// clang-format off
return thread_pool_policy_scheduler<hpx::launch>::
operation_state<
thread_pool_policy_scheduler<hpx::launch>,
Receiver>{
s.scheduler.get_underlying_scheduler(),
HPX_FORWARD(Receiver, receiver)};
// clang-format on
}

template <typename Receiver>
friend auto tag_invoke(
connect_t, parallel_scheduler_sender& s, Receiver&& receiver)
{
// clang-format off
return thread_pool_policy_scheduler<hpx::launch>::
operation_state<
thread_pool_policy_scheduler<hpx::launch>,
Receiver>{
s.scheduler.get_underlying_scheduler(),
HPX_FORWARD(Receiver, receiver)};
// clang-format on
}

struct env
{
parallel_scheduler const& sched;
template <typename CPO>
friend auto tag_invoke(
hpx::execution::experimental::get_completion_scheduler_t<CPO>,
env const& e) noexcept
-> std::enable_if_t<hpx::meta::value<hpx::meta::one_of<CPO,
set_value_t, set_stopped_t>>,
parallel_scheduler const&>
{
return e.sched;
}
};

friend env tag_invoke(hpx::execution::experimental::get_env_t,
parallel_scheduler_sender const& s) noexcept
{
return {s.scheduler};
}
};

// Define schedule_t tag_invoke after parallel_scheduler_sender
inline parallel_scheduler_sender tag_invoke(
schedule_t, parallel_scheduler const& sched) noexcept
{
return {sched};
}

// Added for P2079R10 compliance: bulk_chunked customization
template <typename Sender, typename Policy, typename Shape, typename F>
auto tag_invoke(bulk_chunked_t, parallel_scheduler scheduler,
Sender&& sender, Shape const& shape, F&& f)
{
return tag_invoke(bulk_t{}, scheduler.get_underlying_scheduler(),
HPX_FORWARD(Sender, sender), shape, HPX_FORWARD(F, f));
}

// Stream output operator for parallel_scheduler
inline std::ostream& operator<<(std::ostream& os, const parallel_scheduler&)
{
return os << "parallel_scheduler";
}

// P2079R10 get_parallel_scheduler function
inline parallel_scheduler get_parallel_scheduler()
{
// Use the default thread pool with async policy for parallel execution
auto pool = detail::get_default_parallel_pool();
if (!pool)
{
// clang-format off
std::terminate(); // As per P2079R10, terminate if backend is unavailable
// clang-format on
}
return parallel_scheduler(thread_pool_policy_scheduler<hpx::launch>(
pool, hpx::launch::async));
}

} // namespace hpx::execution::experimental

namespace hpx::execution::experimental::system_context_replaceability {
struct receiver_proxy;
struct bulk_item_receiver_proxy;
struct parallel_scheduler_backend;

std::shared_ptr<parallel_scheduler_backend>
query_parallel_scheduler_backend();
} // namespace hpx::execution::experimental::system_context_replaceability
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@
#include <hpx/topology/cpu_mask.hpp>
#include <hpx/type_support/pack.hpp>

// Added for stop token support
#if defined(HPX_HAVE_STDEXEC)
#include <hpx/execution/queries/get_stop_token.hpp>
#include <hpx/synchronization/stop_token.hpp>
#endif

#include <algorithm>
#include <atomic>
#include <cstddef>
Expand Down Expand Up @@ -75,6 +81,14 @@ namespace hpx::execution::experimental::detail {
HPX_INVOKE(HPX_FORWARD(F, f), HPX_FORWARD(T, t), hpx::get<Is>(ts)...);
}

// Modified to support range-based invocation for P2079R10
template <std::size_t... Is, typename F, typename Ts>
constexpr void bulk_scheduler_invoke_helper(hpx::util::index_pack<Is...>,
F&& f, std::size_t begin, std::size_t end, Ts& ts)
{
HPX_INVOKE(HPX_FORWARD(F, f), begin, end, hpx::get<Is>(ts)...);
}

inline hpx::threads::mask_type full_mask(
std::size_t first_thread, std::size_t num_threads)
{
Expand Down Expand Up @@ -131,8 +145,9 @@ namespace hpx::execution::experimental::detail {
}

private:
// Perform the work in one element indexed by index. The index
// represents a range of indices (iterators) in the given shape.
// Perform the work in one chunk indexed by index. The index
// represents a range of indices in the given shape.
// Modified for P2079R10 compliance: Invoke f with [begin, end] range
template <typename Ts>
void do_work_chunk(Ts& ts, std::uint32_t const index) const
{
Expand All @@ -149,12 +164,8 @@ namespace hpx::execution::experimental::detail {
auto const i_end =
(std::min) (i_begin + task_f->chunk_size, task_f->size);

auto it = std::next(hpx::util::begin(op_state->shape), i_begin);
for (std::uint32_t i = i_begin; i != i_end; (void) ++it, ++i)
{
bulk_scheduler_invoke_helper(
index_pack_type{}, op_state->f, *it, ts);
}
bulk_scheduler_invoke_helper(
index_pack_type{}, op_state->f, i_begin, i_end, ts);
}

template <hpx::concurrency::detail::queue_end Which, typename Ts>
Expand Down Expand Up @@ -259,8 +270,19 @@ namespace hpx::execution::experimental::detail {
bool allow_stealing;

// Visit the values sent by the predecessor sender.
// Modified for P2079R10 compliance: Check stop token before processing
void do_work() const
{
// Check stop token before starting work
if (auto stop_token = hpx::execution::experimental::get_stop_token(
op_state->receiver);
stop_token.stop_requested())
{
hpx::execution::experimental::set_stopped(
HPX_MOVE(op_state->receiver));
return;
}

auto visitor =
set_value_loop_visitor<OperationState>{op_state, this};
hpx::visit(HPX_MOVE(visitor), op_state->ts);
Expand Down
1 change: 1 addition & 0 deletions libs/core/executors/tests/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

set(tests
parallel_scheduler_test
annotating_executor
annotation_property
created_executor
Expand Down
Loading
Loading