Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
3e13add
updated algorithm_bulk test file
charan-003 Jul 18, 2025
d62a019
updated thread_pool_bulk
charan-003 Jul 19, 2025
1435011
implementing stdexec bulk in thread pool scheduler bulk v1
charan-003 Aug 2, 2025
e359e65
implementing stdexec bulk in thread pool scheduler bulk v2
charan-003 Aug 2, 2025
5c16588
implementing stdexec bulk in thread pool scheduler bulk v3
charan-003 Aug 2, 2025
b134e03
changing tag invoke to domain
charan-003 Aug 6, 2025
4b1074b
changing tag invoke to domain v2
charan-003 Aug 6, 2025
c3fcbda
changing tag invoke to domain v3
charan-003 Aug 7, 2025
9e6fe4a
implementing stdexec bulk and domain v1
charan-003 Aug 10, 2025
dfedfc8
clean up header dependencies
charan-003 Aug 12, 2025
b12bfb3
HPX thread pool with domain-based customization
charan-003 Aug 12, 2025
cae2eb4
Merge branch 'STEllAR-GROUP:master' into feature/forward-bulk
charan-003 Aug 13, 2025
ff8a35c
fixing minor errrors v1
Aug 13, 2025
e2de597
fixing errors 1
Aug 13, 2025
75409da
fixing minor errrors v2
Aug 13, 2025
e30e63f
fixing minor errrors v2.1
Aug 13, 2025
8c920f3
fixing minor errrors v3
Aug 14, 2025
0448223
keeping back the old version
Aug 15, 2025
cd232b3
keeping back the old version v1
Aug 15, 2025
cc5ea49
keeping back the old version v2
Aug 15, 2025
e3f489e
keeping back the old version v2.1
Aug 15, 2025
806f82c
keeping back the old version v3
Aug 16, 2025
7c8c0d2
fixing sender related errors v1
Aug 16, 2025
e95e281
fixing sender related errors v2
Aug 16, 2025
7febc0f
fixing sender related errors v3
Aug 16, 2025
d512cfb
fixing sender related errors v3.1
Aug 16, 2025
b362a07
fix continue on errors
Aug 16, 2025
89bc8e5
fix continue on errorss
Aug 16, 2025
3f3b523
fix errors
Aug 16, 2025
348ed1f
fix errors-1
Aug 16, 2025
bcba0d6
fixing macos errors
Aug 17, 2025
6ddf196
fixing macos errors thread pool scheduler
Aug 17, 2025
5443436
fix errrors
Aug 17, 2025
c0775b7
erors
Aug 19, 2025
22554b3
going bac
Aug 19, 2025
c88dbe9
fixing format
charan-003 Aug 19, 2025
b31dddb
macos errors
charan-003 Aug 19, 2025
4d1d90c
Merge branch 'master' into feature/forward-bulk
charan-003 Aug 19, 2025
318cafe
macos errors 1
charan-003 Aug 19, 2025
51a378e
t reset --hard HEAD~1
charan-003 Aug 19, 2025
15f19e6
macos errors.
charan-003 Aug 20, 2025
aa26777
macos errors..
charan-003 Aug 20, 2025
555f473
Merge commit 'aa26777503' into feature/forward-bulk
charan-003 Aug 20, 2025
581f49f
macos errors..
charan-003 Aug 20, 2025
03322c0
Merge remote-tracking branch 'upstream/master' into feature/forward-bulk
Aug 27, 2025
54bc90a
new changes
Aug 27, 2025
2472a9c
new changes -1
Aug 27, 2025
d0353fb
new changes 1
Aug 27, 2025
ecd9bb0
minor changes
Sep 1, 2025
61a9ccd
Merge branch 'master' into feature/forward-bulk
charan-003 Sep 1, 2025
85e287e
minor changes
Sep 2, 2025
dc7653c
adding back stdexec execution polocies
Sep 2, 2025
78abc27
typo
Sep 2, 2025
701a695
Half-working changes from Sep-03 meeting
isidorostsa Sep 3, 2025
242b506
Rename transfer to continues on in tps test
isidorostsa Sep 6, 2025
c714324
Comment out test for the currently unimplemented bulk_chunked
isidorostsa Sep 6, 2025
961f198
Revert thread_pool_scheduler to use ranges,
isidorostsa Sep 6, 2025
f1625e4
Use ranges::range_value_t instead of hpx::iter_value<hpx::range_iter.…
isidorostsa Sep 6, 2025
a005ea4
Transform sender feeds an iota range to the thread_pool_bulk_sender f…
isidorostsa Sep 6, 2025
856bf42
Merge branch 'master' into feature/forward-bulk
charan-003 Sep 6, 2025
515faa4
P3481R5 bulk execution compliance in HPX bulk
Sep 9, 2025
0d708d0
P3481R5 bulk execution compliance in HPX bulk .
Sep 9, 2025
6c8aea6
macos errors
Sep 9, 2025
269e2b3
Merge branch 'master' into feature/forward-bulk
charan-003 Sep 10, 2025
77620dd
fix format
Sep 10, 2025
58c6952
Domain Transform Logic
Sep 10, 2025
4d65eeb
Domain Transform Logic fix format
Sep 10, 2025
73ae6fb
macos errrors
Sep 10, 2025
3119ef4
detect bulk as default implementation to bulk chunked
Sep 14, 2025
f3fc6fa
Merge branch 'master' into feature/forward-bulk
charan-003 Sep 14, 2025
6ada3c8
revert back bulk_t
Sep 15, 2025
6a8fb0e
bulk_chunked uses f(start,end,...)
Sep 15, 2025
13b291e
remove bulk_t
Sep 15, 2025
e9eefdd
formatting
Sep 16, 2025
c8e8d9c
formatting fix
Sep 16, 2025
2f9b3b5
formatting fix arrow
Sep 16, 2025
b18a39a
minor fix
Sep 17, 2025
a568202
removing continue on
Sep 17, 2025
36ae7f4
format
Sep 17, 2025
e9685b2
Concept Check set_value got correct F
isidorostsa Sep 18, 2025
ab206b1
Revert "Concept Check set_value got correct F"
isidorostsa Sep 24, 2025
79f731c
old implementation of bulk
Oct 2, 2025
ee410cf
include header
Oct 2, 2025
e38f0a4
Merge branch 'master' into feature/forward-bulk
charan-003 Oct 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove those files from this PR as they are being worked on for the same reason by another contributor

Original file line number Diff line number Diff line change
Expand Up @@ -542,53 +542,6 @@ namespace hpx::ranges {
HPX_FORWARD(ExPolicy, policy), hpx::util::begin(rng),
hpx::util::end(rng), HPX_MOVE(f), HPX_MOVE(proj));
}

#if defined(HPX_HAVE_STDEXEC)
// Sender algorithm support for stdexec integration
template <typename Sender, typename ExPolicy, typename F,
typename Proj = hpx::identity>
// clang-format off
requires (
hpx::execution::experimental::sender<Sender> &&
std::invocable<F,
typename hpx::execution::experimental::value_types_of_t<
Sender, hpx::execution::experimental::empty_env>::
template apply<std::tuple>>
)
// clang-format on
friend auto tag_fallback_invoke(hpx::ranges::for_each_t,
Sender&& sender, ExPolicy&& policy, F&& f, Proj&& proj = Proj{})
{
return HPX_FORWARD(Sender, sender) |
hpx::execution::experimental::let_value(
[policy = HPX_FORWARD(ExPolicy, policy),
f = HPX_FORWARD(F, f),
proj = HPX_FORWARD(Proj, proj)](auto&& rng) mutable {
return hpx::execution::experimental::just(hpx::for_each(
policy, HPX_FORWARD(decltype(rng), rng),
HPX_MOVE(f), HPX_MOVE(proj)));
});
}

// Partial algorithm support for stdexec senders
template <typename ExPolicy>
friend auto tag_fallback_invoke(
hpx::ranges::for_each_t, ExPolicy&& policy)
{
return [policy = HPX_FORWARD(ExPolicy, policy)](
auto&& sender) mutable {
return HPX_FORWARD(decltype(sender), sender) |
hpx::execution::experimental::let_value(
[policy = HPX_MOVE(policy)](
auto&& rng, auto&& f) mutable {
return hpx::execution::experimental::just(
hpx::for_each(policy,
HPX_FORWARD(decltype(rng), rng),
HPX_FORWARD(decltype(f), f)));
});
};
}
#endif
} for_each{};

///////////////////////////////////////////////////////////////////////////
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes here are not important

Original file line number Diff line number Diff line change
Expand Up @@ -508,15 +508,15 @@ namespace hpx::execution::experimental {
// The sender_of concept defines the requirements for a sender type that on
// successful completion sends the specified set of value types.
template <typename Sender, typename Signal,
typename Env = hpx::execution::experimental::env<>>
typename Env = hpx::execution::experimental::empty_env>
struct is_sender_of
: std::bool_constant<
hpx::execution::experimental::sender_of<Sender, Signal, Env>>
{
};

template <typename Sender, typename Signal,
typename Env = hpx::execution::experimental::env<>>
typename Env = hpx::execution::experimental::empty_env>
inline constexpr bool is_sender_of_v =
is_sender_of<Sender, Signal, Env>::value;

Expand All @@ -533,7 +533,7 @@ namespace hpx::execution::experimental {
// 3. Otherwise, single-sender-value-type<S, E> is ill-formed.
//
template <typename Sender,
typename Env = hpx::execution::experimental::env<>>
typename Env = hpx::execution::experimental::empty_env>
using single_sender_value_t =
hpx::execution::experimental::stdexec_internal::__single_sender_value_t<
Sender, Env>;
Expand Down Expand Up @@ -1008,9 +1008,8 @@ namespace hpx::execution::experimental {
value_or_void_t<Value>, std::exception_ptr>;

template <typename Promise>
using coroutine_env_t =
hpx::util::detected_or<hpx::execution::experimental::empty_env,
hpx::functional::tag_invoke_result_t, get_env_t, Promise>;
using coroutine_env_t = hpx::util::detected_or<exec_envs::empty_env,
hpx::functional::tag_invoke_result_t, get_env_t, Promise>;

template <typename Value>
struct receiver_base
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,6 @@ namespace hpx::execution::experimental {
// Continue on
using stdexec::continue_on;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This (134, 135) should be removed too, since the proper spelling is continues_on

using stdexec::continue_on_t;
// Backward compatibility alias for continues_on
inline constexpr stdexec::continue_on_t continues_on{};
using continues_on_t = stdexec::continue_on_t;

// Transfer just
using stdexec::transfer_just;
Expand Down Expand Up @@ -322,12 +319,12 @@ namespace hpx::execution::experimental {
}

using stdexec::__connect_awaitable_t;
} // namespace stdexec_internal

// Additional stdexec concepts and utilities needed for domain customization
using stdexec::__completes_on;
using stdexec::__starts_on;
using stdexec::sender_expr_for;
// Additional stdexec concepts and utilities needed for domain customization
using stdexec::__completes_on;
using stdexec::__starts_on;
using stdexec::sender_expr_for;
} // namespace stdexec_internal
} // namespace hpx::execution::experimental

// Leaving this as a placeholder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ namespace hpx::execution::experimental {
// Concept to match bulk sender types
// Note: We keep bulk_t handling as pragmatic workaround for stdexec template issues
template <typename Sender>
concept any_bulk_sender =
hpx::execution::experimental::sender_expr_for<Sender,
concept bulk_chunked_or_unchunked_sender =
hpx::execution::experimental::stdexec_internal::sender_expr_for<Sender,
hpx::execution::experimental::bulk_chunked_t> ||
hpx::execution::experimental::sender_expr_for<Sender,
hpx::execution::experimental::stdexec_internal::sender_expr_for<Sender,
hpx::execution::experimental::bulk_unchunked_t>;

// Domain customization for stdexec bulk operations
Expand All @@ -95,10 +95,10 @@ namespace hpx::execution::experimental {
{
// Unified transform_sender for all bulk operations without environment
// (completes_on pattern)
template <any_bulk_sender Sender>
template <bulk_chunked_or_unchunked_sender Sender>
auto transform_sender(Sender&& sndr) const noexcept
{
static_assert(hpx::execution::experimental::__completes_on<Sender,
static_assert(hpx::execution::experimental::stdexec_internal::__completes_on<Sender,
thread_pool_policy_scheduler<Policy>>,
"No thread_pool_policy_scheduler instance can be found in the "
"sender's "
Expand All @@ -115,7 +115,7 @@ namespace hpx::execution::experimental {

auto iota_shape = std::views::iota(decltype(shape){0}, shape);

if constexpr (hpx::execution::experimental::sender_expr_for<Sender,
if constexpr (hpx::execution::experimental::stdexec_internal::sender_expr_for<Sender,
hpx::execution::experimental::bulk_unchunked_t>)
{
// This should be launching one hpx thread for each index
Expand All @@ -131,7 +131,7 @@ namespace hpx::execution::experimental {
};
}
else if constexpr (
hpx::execution::experimental::sender_expr_for<Sender,
hpx::execution::experimental::stdexec_internal::sender_expr_for<Sender,
hpx::execution::experimental::bulk_chunked_t>)
{
// This should be launching one hpx thread for each chunk
Expand All @@ -150,10 +150,10 @@ namespace hpx::execution::experimental {

// Unified transform_sender for all bulk operations with environment
// (starts_on pattern)
template <any_bulk_sender Sender, typename Env>
template <bulk_chunked_or_unchunked_sender Sender, typename Env>
auto transform_sender(Sender&& sndr, const Env& env) const noexcept
{
static_assert(hpx::execution::experimental::__starts_on<Sender,
static_assert(hpx::execution::experimental::stdexec_internal::__starts_on<Sender,
thread_pool_policy_scheduler<Policy>, Env>,
"No thread_pool_policy_scheduler instance can be found in the "
"receiver's "
Expand All @@ -167,7 +167,7 @@ namespace hpx::execution::experimental {

auto iota_shape = std::views::iota(decltype(shape){0}, shape);

if constexpr (hpx::execution::experimental::sender_expr_for<Sender,
if constexpr (hpx::execution::experimental::stdexec_internal::sender_expr_for<Sender,
hpx::execution::experimental::bulk_unchunked_t>)
{
return hpx::execution::experimental::detail::
Expand All @@ -182,7 +182,7 @@ namespace hpx::execution::experimental {
};
}
else if constexpr (
hpx::execution::experimental::sender_expr_for<Sender,
hpx::execution::experimental::stdexec_internal::sender_expr_for<Sender,
hpx::execution::experimental::bulk_chunked_t>)
{
return hpx::execution::experimental::detail::
Expand Down Expand Up @@ -451,7 +451,6 @@ namespace hpx::execution::experimental {
{
return {s.scheduler, HPX_FORWARD(Receiver, receiver)};
}
#if defined(HPX_HAVE_STDEXEC)
struct env
{
std::decay_t<Scheduler> const& sched;
Expand All @@ -470,12 +469,14 @@ namespace hpx::execution::experimental {
return e.sched;
}

#if defined(HPX_HAVE_STDEXEC)
// Add domain query to sender environment
friend constexpr auto tag_invoke(
stdexec::get_domain_t, env const& e) noexcept
{
return stdexec::get_domain(e.sched);
}
#endif
};

friend constexpr auto tag_invoke(
Expand All @@ -498,35 +499,6 @@ namespace hpx::execution::experimental {
{
return s.scheduler;
}
#else
struct env_no_stdexec
{
thread_pool_policy_scheduler<Policy> sched;

friend constexpr auto tag_invoke(
hpx::execution::experimental::get_completion_scheduler_t<
hpx::execution::experimental::set_value_t>,
env_no_stdexec const& e) noexcept
{
return e.sched;
}

friend constexpr auto tag_invoke(
hpx::execution::experimental::get_completion_scheduler_t<
hpx::execution::experimental::set_stopped_t>,
env_no_stdexec const& e) noexcept
{
return e.sched;
}
};

friend constexpr auto tag_invoke(
hpx::execution::experimental::get_env_t,
sender const& s) noexcept
{
return env_no_stdexec{s.scheduler};
};
#endif
};

#if defined(HPX_HAVE_STDEXEC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ namespace hpx::execution::experimental::detail {
chunk_size *= 2;
}
return static_cast<std::uint32_t>(
(std::max) (chunk_size, std::uint64_t(1)));
(std::max)(chunk_size, std::uint64_t(1)));
}

// For bulk_unchunked: f(index, ...)
Expand All @@ -99,18 +99,8 @@ namespace hpx::execution::experimental::detail {
constexpr void bulk_scheduler_invoke_helper_chunked(
hpx::util::index_pack<Is...>, F&& f, Start&& start, End&& end, Ts& ts)
{
if constexpr (sizeof...(Is) == 0)
{
// No additional arguments from sender values
HPX_INVOKE(HPX_FORWARD(F, f), HPX_FORWARD(Start, start),
HPX_FORWARD(End, end));
}
else
{
// Additional arguments from sender values
HPX_INVOKE(HPX_FORWARD(F, f), HPX_FORWARD(Start, start),
HPX_FORWARD(End, end), hpx::get<Is>(ts)...);
}
HPX_INVOKE(HPX_FORWARD(F, f), HPX_FORWARD(Start, start),
HPX_FORWARD(End, end), hpx::get<Is>(ts)...);
}

inline hpx::threads::mask_type full_mask(
Expand Down Expand Up @@ -186,10 +176,8 @@ namespace hpx::execution::experimental::detail {
auto const i_begin =
static_cast<std::size_t>(index) * task_f->chunk_size;
auto const i_end =
(std::min) (i_begin + task_f->chunk_size, task_f->size);
(std::min)(i_begin + task_f->chunk_size, task_f->size);

auto it =
std::ranges::next(hpx::util::begin(op_state->shape), i_begin);
if constexpr (OperationState::is_chunked)
{
// bulk_chunked: f(start, end, values...)
Expand All @@ -199,6 +187,8 @@ namespace hpx::execution::experimental::detail {
else
{
// bulk_unchunked: f(index, values...) for each element
auto it = std::ranges::next(
hpx::util::begin(op_state->shape), i_begin);
for (std::uint32_t i = i_begin; i != i_end; (void) ++i)
{
bulk_scheduler_invoke_helper(
Expand Down Expand Up @@ -428,8 +418,8 @@ namespace hpx::execution::experimental::detail {
auto& queue = op_state->queues[worker_thread].data_;
auto const num_steps = size / num_threads + 1;
auto const part_begin = worker_thread;
auto part_end = (std::min) (size + num_threads - 1,
part_begin + num_steps * num_threads);
auto part_end = (std::min)(
size + num_threads - 1, part_begin + num_steps * num_threads);
auto const remainder = (part_end - part_begin) % num_threads;
if (remainder != 0)
{
Expand Down Expand Up @@ -502,7 +492,6 @@ namespace hpx::execution::experimental::detail {
{
// Don't spawn tasks if there is no work to be done
auto const size =
// static_cast<std::uint32_t>(op_state->shape);
static_cast<std::uint32_t>(hpx::util::size(op_state->shape));
if (size == 0)
{
Expand Down Expand Up @@ -644,7 +633,11 @@ namespace hpx::execution::experimental::detail {
}

// clang-format off
template <typename... Ts>
template <typename... Ts,
HPX_CONCEPT_REQUIRES_(
hpx::is_invocable_v<F, range_value_type,
std::add_lvalue_reference_t<Ts>...>
)>
// clang-format on
friend void tag_invoke(hpx::execution::experimental::set_value_t,
bulk_receiver&& r, Ts&&... ts) noexcept
Expand Down Expand Up @@ -906,6 +899,60 @@ namespace hpx::execution::experimental::detail {
};
} // namespace hpx::execution::experimental::detail

#if !defined(HPX_HAVE_STDEXEC)
namespace hpx::execution::experimental {
// clang-format off
template <typename Policy, typename Sender, typename Shape, typename F,
HPX_CONCEPT_REQUIRES_(
!std::is_integral_v<Shape>
)>
// clang-format on
constexpr auto tag_invoke(bulk_t,
thread_pool_policy_scheduler<Policy> scheduler, Sender&& sender,
Shape const& shape, F&& f)
{
if constexpr (std::is_same_v<Policy, launch::sync_policy>)
{
// fall back to non-bulk scheduling if sync execution was requested
return detail::bulk_sender<Sender, Shape, F>{
HPX_FORWARD(Sender, sender), shape, HPX_FORWARD(F, f)};
}
else
{
return detail::thread_pool_bulk_sender<Policy, Sender, Shape, F>{
HPX_MOVE(scheduler), HPX_FORWARD(Sender, sender), shape,
HPX_FORWARD(F, f)};
}
}

// clang-format off
template <typename Policy, typename Sender, typename Count, typename F,
HPX_CONCEPT_REQUIRES_(
std::is_integral_v<Count>
)>
// clang-format on
constexpr decltype(auto) tag_invoke(bulk_t,
thread_pool_policy_scheduler<Policy> scheduler, Sender&& sender,
Count const& count, F&& f)
{
if constexpr (std::is_same_v<Policy, launch::sync_policy>)
{
// fall back to non-bulk scheduling if sync execution was requested
return detail::bulk_sender<Sender, hpx::util::counting_shape<Count>,
F>{HPX_FORWARD(Sender, sender),
hpx::util::counting_shape(count), HPX_FORWARD(F, f)};
}
else
{
return detail::thread_pool_bulk_sender<Policy, Sender,
hpx::util::counting_shape<Count>, F>{HPX_MOVE(scheduler),
HPX_FORWARD(Sender, sender), hpx::util::counting_shape(count),
HPX_FORWARD(F, f)};
}
}
} // namespace hpx::execution::experimental
#endif

// Note: With stdexec integration, bulk operations are now customized
// through the domain system in thread_pool_scheduler.hpp rather than
// direct tag_invoke customizations. The thread_pool_domain<Policy>
Expand Down
Loading