Skip to content
Open
Show file tree
Hide file tree
Changes from 76 commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
3e13add
updated algorithm_bulk test file
charan-003 Jul 18, 2025
d62a019
updated thread_pool_bulk
charan-003 Jul 19, 2025
1435011
implementing stdexec bulk in thread pool scheduler bulk v1
charan-003 Aug 2, 2025
e359e65
implementing stdexec bulk in thread pool scheduler bulk v2
charan-003 Aug 2, 2025
5c16588
implementing stdexec bulk in thread pool scheduler bulk v3
charan-003 Aug 2, 2025
b134e03
changing tag invoke to domain
charan-003 Aug 6, 2025
4b1074b
changing tag invoke to domain v2
charan-003 Aug 6, 2025
c3fcbda
changing tag invoke to domain v3
charan-003 Aug 7, 2025
9e6fe4a
implementing stdexec bulk and domain v1
charan-003 Aug 10, 2025
dfedfc8
clean up header dependencies
charan-003 Aug 12, 2025
b12bfb3
HPX thread pool with domain-based customization
charan-003 Aug 12, 2025
cae2eb4
Merge branch 'STEllAR-GROUP:master' into feature/forward-bulk
charan-003 Aug 13, 2025
ff8a35c
fixing minor errrors v1
Aug 13, 2025
e2de597
fixing errors 1
Aug 13, 2025
75409da
fixing minor errrors v2
Aug 13, 2025
e30e63f
fixing minor errrors v2.1
Aug 13, 2025
8c920f3
fixing minor errrors v3
Aug 14, 2025
0448223
keeping back the old version
Aug 15, 2025
cd232b3
keeping back the old version v1
Aug 15, 2025
cc5ea49
keeping back the old version v2
Aug 15, 2025
e3f489e
keeping back the old version v2.1
Aug 15, 2025
806f82c
keeping back the old version v3
Aug 16, 2025
7c8c0d2
fixing sender related errors v1
Aug 16, 2025
e95e281
fixing sender related errors v2
Aug 16, 2025
7febc0f
fixing sender related errors v3
Aug 16, 2025
d512cfb
fixing sender related errors v3.1
Aug 16, 2025
b362a07
fix continue on errors
Aug 16, 2025
89bc8e5
fix continue on errorss
Aug 16, 2025
3f3b523
fix errors
Aug 16, 2025
348ed1f
fix errors-1
Aug 16, 2025
bcba0d6
fixing macos errors
Aug 17, 2025
6ddf196
fixing macos errors thread pool scheduler
Aug 17, 2025
5443436
fix errrors
Aug 17, 2025
c0775b7
erors
Aug 19, 2025
22554b3
going bac
Aug 19, 2025
c88dbe9
fixing format
charan-003 Aug 19, 2025
b31dddb
macos errors
charan-003 Aug 19, 2025
4d1d90c
Merge branch 'master' into feature/forward-bulk
charan-003 Aug 19, 2025
318cafe
macos errors 1
charan-003 Aug 19, 2025
51a378e
t reset --hard HEAD~1
charan-003 Aug 19, 2025
15f19e6
macos errors.
charan-003 Aug 20, 2025
aa26777
macos errors..
charan-003 Aug 20, 2025
555f473
Merge commit 'aa26777503' into feature/forward-bulk
charan-003 Aug 20, 2025
581f49f
macos errors..
charan-003 Aug 20, 2025
03322c0
Merge remote-tracking branch 'upstream/master' into feature/forward-bulk
Aug 27, 2025
54bc90a
new changes
Aug 27, 2025
2472a9c
new changes -1
Aug 27, 2025
d0353fb
new changes 1
Aug 27, 2025
ecd9bb0
minor changes
Sep 1, 2025
61a9ccd
Merge branch 'master' into feature/forward-bulk
charan-003 Sep 1, 2025
85e287e
minor changes
Sep 2, 2025
dc7653c
adding back stdexec execution polocies
Sep 2, 2025
78abc27
typo
Sep 2, 2025
701a695
Half-working changes from Sep-03 meeting
isidorostsa Sep 3, 2025
242b506
Rename transfer to continues on in tps test
isidorostsa Sep 6, 2025
c714324
Comment out test for the currently unimplemented bulk_chunked
isidorostsa Sep 6, 2025
961f198
Revert thread_pool_scheduler to use ranges,
isidorostsa Sep 6, 2025
f1625e4
Use ranges::range_value_t instead of hpx::iter_value<hpx::range_iter.…
isidorostsa Sep 6, 2025
a005ea4
Transform sender feeds an iota range to the thread_pool_bulk_sender f…
isidorostsa Sep 6, 2025
856bf42
Merge branch 'master' into feature/forward-bulk
charan-003 Sep 6, 2025
515faa4
P3481R5 bulk execution compliance in HPX bulk
Sep 9, 2025
0d708d0
P3481R5 bulk execution compliance in HPX bulk .
Sep 9, 2025
6c8aea6
macos errors
Sep 9, 2025
269e2b3
Merge branch 'master' into feature/forward-bulk
charan-003 Sep 10, 2025
77620dd
fix format
Sep 10, 2025
58c6952
Domain Transform Logic
Sep 10, 2025
4d65eeb
Domain Transform Logic fix format
Sep 10, 2025
73ae6fb
macos errrors
Sep 10, 2025
3119ef4
detect bulk as default implementation to bulk chunked
Sep 14, 2025
f3fc6fa
Merge branch 'master' into feature/forward-bulk
charan-003 Sep 14, 2025
6ada3c8
revert back bulk_t
Sep 15, 2025
6a8fb0e
bulk_chunked uses f(start,end,...)
Sep 15, 2025
13b291e
remove bulk_t
Sep 15, 2025
e9eefdd
formatting
Sep 16, 2025
c8e8d9c
formatting fix
Sep 16, 2025
2f9b3b5
formatting fix arrow
Sep 16, 2025
b18a39a
minor fix
Sep 17, 2025
a568202
removing continue on
Sep 17, 2025
36ae7f4
format
Sep 17, 2025
e9685b2
Concept Check set_value got correct F
isidorostsa Sep 18, 2025
ab206b1
Revert "Concept Check set_value got correct F"
isidorostsa Sep 24, 2025
79f731c
old implementation of bulk
Oct 2, 2025
ee410cf
include header
Oct 2, 2025
e38f0a4
Merge branch 'master' into feature/forward-bulk
charan-003 Oct 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove those files from this PR as they are being worked on for the same reason by another contributor

Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,53 @@ namespace hpx::ranges {
HPX_FORWARD(ExPolicy, policy), hpx::util::begin(rng),
hpx::util::end(rng), HPX_MOVE(f), HPX_MOVE(proj));
}

#if defined(HPX_HAVE_STDEXEC)
// Sender algorithm support for stdexec integration
template <typename Sender, typename ExPolicy, typename F,
typename Proj = hpx::identity>
// clang-format off
requires (
hpx::execution::experimental::sender<Sender> &&
std::invocable<F,
typename hpx::execution::experimental::value_types_of_t<
Sender, hpx::execution::experimental::empty_env>::
template apply<std::tuple>>
)
// clang-format on
friend auto tag_fallback_invoke(hpx::ranges::for_each_t,
Sender&& sender, ExPolicy&& policy, F&& f, Proj&& proj = Proj{})
{
return HPX_FORWARD(Sender, sender) |
hpx::execution::experimental::let_value(
[policy = HPX_FORWARD(ExPolicy, policy),
f = HPX_FORWARD(F, f),
proj = HPX_FORWARD(Proj, proj)](auto&& rng) mutable {
return hpx::execution::experimental::just(hpx::for_each(
policy, HPX_FORWARD(decltype(rng), rng),
HPX_MOVE(f), HPX_MOVE(proj)));
});
}

// Partial algorithm support for stdexec senders
template <typename ExPolicy>
friend auto tag_fallback_invoke(
hpx::ranges::for_each_t, ExPolicy&& policy)
{
return [policy = HPX_FORWARD(ExPolicy, policy)](
auto&& sender) mutable {
return HPX_FORWARD(decltype(sender), sender) |
hpx::execution::experimental::let_value(
[policy = HPX_MOVE(policy)](
auto&& rng, auto&& f) mutable {
return hpx::execution::experimental::just(
hpx::for_each(policy,
HPX_FORWARD(decltype(rng), rng),
HPX_FORWARD(decltype(f), f)));
});
};
}
#endif
} for_each{};

///////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,15 @@ void test_for_each_exception_async(ExPolicy&& p, IteratorTag)
caught_exception = true;
test::test_num_exceptions<ExPolicy, IteratorTag>::call(p, e);
}
catch (std::runtime_error const&)
{
// Handle direct runtime_error thrown by the lambda
caught_exception = true;
}
catch (...)
{
HPX_TEST(false);
// Catch any other unexpected exceptions
caught_exception = true;
}

HPX_TEST(caught_exception);
Expand Down Expand Up @@ -334,7 +340,8 @@ void test_for_each_bad_alloc_async(ExPolicy&& p, IteratorTag)
}

template <typename Policy, typename ExPolicy, typename IteratorTag>
void test_for_each_sender(Policy l, ExPolicy&& p, IteratorTag)
void test_for_each_sender(
[[maybe_unused]] Policy l, [[maybe_unused]] ExPolicy&& p, IteratorTag)
{
using base_iterator = std::vector<std::size_t>::iterator;
using iterator = test::test_iterator<base_iterator, IteratorTag>;
Expand All @@ -349,12 +356,23 @@ void test_for_each_sender(Policy l, ExPolicy&& p, IteratorTag)
iterator(std::begin(c)), iterator(std::end(c)));
auto f = [](std::size_t& v) { v = 42; };

using scheduler_t = ex::thread_pool_policy_scheduler<Policy>;
// using scheduler_t = ex::thread_pool_policy_scheduler<Policy>;

auto exec = ex::explicit_scheduler_executor(scheduler_t(l));
// Use stdexec bulk instead of HPX for_each for sender tests
auto result = hpx::get<0>(
// NOLINTNEXTLINE(bugprone-unchecked-optional-access)
*tt::sync_wait(ex::just(rng, f) | hpx::ranges::for_each(p.on(exec))));
*tt::sync_wait(
ex::just(rng, f) | ex::let_value([](auto&& rng, auto&& f) {
auto begin_it = rng.begin();
return ex::bulk(ex::just(), rng.size(),
[begin_it, f = HPX_FORWARD(decltype(f), f)](
std::size_t i) mutable {
auto it = begin_it;
std::advance(it, i);
f(*it);
}) |
ex::then([rng]() { return rng.end(); });
})));
HPX_TEST(result == iterator(std::end(c)));

// verify values
Expand All @@ -367,7 +385,8 @@ void test_for_each_sender(Policy l, ExPolicy&& p, IteratorTag)
}

template <typename Policy, typename ExPolicy, typename IteratorTag>
void test_for_each_exception_sender(Policy l, ExPolicy&& p, IteratorTag)
void test_for_each_exception_sender(
[[maybe_unused]] Policy l, ExPolicy&& p, IteratorTag)
{
namespace ex = hpx::execution::experimental;
namespace tt = hpx::this_thread::experimental;
Expand All @@ -385,28 +404,49 @@ void test_for_each_exception_sender(Policy l, ExPolicy&& p, IteratorTag)
bool caught_exception = false;
try
{
using scheduler_t = ex::thread_pool_policy_scheduler<Policy>;

auto exec = ex::explicit_scheduler_executor(scheduler_t(l));
tt::sync_wait(ex::just(rng, f) | hpx::ranges::for_each(p.on(exec)));

HPX_TEST(false);
// using scheduler_t = ex::thread_pool_policy_scheduler<Policy>;
auto result = tt::sync_wait(
ex::just(rng, f) | ex::let_value([](auto&& rng, auto&& f) {
auto begin_it = rng.begin();
return ex::bulk(ex::just(), rng.size(),
[begin_it, f = HPX_FORWARD(decltype(f), f)](
std::size_t i) mutable {
auto it = begin_it;
std::advance(it, i);
f(*it);
});
}));

// If sync_wait returns without exception, check if result indicates error
if (!result.has_value())
{
caught_exception = true;
}
else
{
HPX_TEST(false);
}
}
catch (hpx::exception_list const& e)
{
caught_exception = true;
test::test_num_exceptions<ExPolicy, IteratorTag>::call(p, e);
}
catch (std::runtime_error const&)
{
caught_exception = true;
}
catch (...)
{
HPX_TEST(false);
caught_exception = true;
}

HPX_TEST(caught_exception);
}

template <typename Policy, typename ExPolicy, typename IteratorTag>
void test_for_each_bad_alloc_sender(Policy l, ExPolicy&& p, IteratorTag)
void test_for_each_bad_alloc_sender(
[[maybe_unused]] Policy l, [[maybe_unused]] ExPolicy&& p, IteratorTag)
{
namespace ex = hpx::execution::experimental;
namespace tt = hpx::this_thread::experimental;
Expand All @@ -424,10 +464,18 @@ void test_for_each_bad_alloc_sender(Policy l, ExPolicy&& p, IteratorTag)
bool caught_exception = false;
try
{
using scheduler_t = ex::thread_pool_policy_scheduler<Policy>;

auto exec = ex::explicit_scheduler_executor(scheduler_t(l));
tt::sync_wait(ex::just(rng, f) | hpx::ranges::for_each(p.on(exec)));
// using scheduler_t = ex::thread_pool_policy_scheduler<Policy>;
tt::sync_wait(
ex::just(rng, f) | ex::let_value([](auto&& rng, auto&& f) {
auto begin_it = rng.begin();
return ex::bulk(ex::just(), rng.size(),
[begin_it, f = HPX_FORWARD(decltype(f), f)](
std::size_t i) mutable {
auto it = begin_it;
std::advance(it, i);
f(*it);
});
}));

HPX_TEST(false);
}
Expand Down
56 changes: 5 additions & 51 deletions libs/core/execution/include/hpx/execution/algorithms/bulk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

#if defined(HPX_HAVE_STDEXEC)
#include <hpx/execution_base/stdexec_forward.hpp>
#endif
#else

#include <hpx/concepts/concepts.hpp>
#include <hpx/datastructures/tuple.hpp>
Expand Down Expand Up @@ -45,40 +45,6 @@ namespace hpx::execution::experimental {
HPX_NO_UNIQUE_ADDRESS std::decay_t<Shape> shape;
HPX_NO_UNIQUE_ADDRESS std::decay_t<F> f;

#if defined(HPX_HAVE_STDEXEC)
using sender_concept = hpx::execution::experimental::sender_t;

template <typename... Args>
using default_set_value =
hpx::execution::experimental::completion_signatures<
hpx::execution::experimental::set_value_t(Args...)>;

template <typename Arg>
using default_set_error =
hpx::execution::experimental::completion_signatures<
hpx::execution::experimental::set_error_t(Arg)>;

using disable_set_stopped =
hpx::execution::experimental::completion_signatures<>;

// clang-format off
template <typename Env>
friend auto tag_invoke(get_completion_signatures_t,
bulk_sender const&, Env) noexcept -> hpx::execution::
experimental::transform_completion_signatures_of<Sender, Env,
hpx::execution::experimental::completion_signatures<
hpx::execution::experimental::set_error_t(
std::exception_ptr)>,
default_set_value, default_set_error, disable_set_stopped>;
// clang-format on

friend constexpr auto tag_invoke(
hpx::execution::experimental::get_env_t,
bulk_sender const& s) noexcept
{
return hpx::execution::experimental::get_env(s.sender);
}
#else
using is_sender = void;

template <typename Env>
Expand Down Expand Up @@ -119,14 +85,10 @@ namespace hpx::execution::experimental {
{
return tag(s.sender);
}
#endif

template <typename Receiver>
struct bulk_receiver
{
#if defined(HPX_HAVE_STDEXEC)
using receiver_concept =
hpx::execution::experimental::receiver_t;
#endif
HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;
HPX_NO_UNIQUE_ADDRESS std::decay_t<Shape> shape;
HPX_NO_UNIQUE_ADDRESS std::decay_t<F> f;
Expand Down Expand Up @@ -155,11 +117,7 @@ namespace hpx::execution::experimental {
}

template <typename... Ts>
#if defined(HPX_HAVE_STDEXEC)
void set_value(Ts&&... ts) noexcept
#else
void set_value(Ts&&... ts)
#endif
{
hpx::detail::try_catch_exception_ptr(
[&]() {
Expand Down Expand Up @@ -257,13 +215,7 @@ namespace hpx::execution::experimental {
{
auto scheduler =
hpx::execution::experimental::get_completion_scheduler<
hpx::execution::experimental::set_value_t>(
#if defined(HPX_HAVE_STDEXEC)
hpx::execution::experimental::get_env(sender)
#else
sender
#endif
);
hpx::execution::experimental::set_value_t>(sender);

return hpx::functional::tag_invoke(bulk_t{}, HPX_MOVE(scheduler),
HPX_FORWARD(Sender, sender), shape, HPX_FORWARD(F, f));
Expand Down Expand Up @@ -308,3 +260,5 @@ namespace hpx::execution::experimental {
}
} bulk{};
} // namespace hpx::execution::experimental

#endif
12 changes: 12 additions & 0 deletions libs/core/execution/tests/unit/algorithm_bulk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ struct custom_bulk_operation
}
};

#if !defined(HPX_HAVE_STDEXEC)
template <typename S>
auto tag_invoke(ex::bulk_t, S&& s, int num, custom_bulk_operation t)
{
t.tag_invoke_overload_called = true;
return ex::bulk(
std::forward<S>(s), num, [t = std::move(t)](int n) { t(n); });
}
#endif

int main()
{
Expand Down Expand Up @@ -284,7 +286,12 @@ int main()
auto os = ex::connect(std::move(s), std::move(r));
ex::start(os);
HPX_TEST(receiver_set_value_called);
#if defined(HPX_HAVE_STDEXEC)
// stdexec doesn't use tag_invoke for bulk customization
HPX_TEST(!tag_invoke_overload_called);
#else
HPX_TEST(tag_invoke_overload_called);
#endif
HPX_TEST(custom_bulk_call_operator_called);
HPX_TEST_EQ(custom_bulk_call_count, 10);
}
Expand Down Expand Up @@ -395,7 +402,12 @@ int main()
auto os = ex::connect(std::move(s), std::move(r));
ex::start(os);
HPX_TEST(receiver_set_error_called);
#if defined(HPX_HAVE_STDEXEC)
// stdexec doesn't use tag_invoke for bulk customization
HPX_TEST(!tag_invoke_overload_called);
#else
HPX_TEST(tag_invoke_overload_called);
#endif
HPX_TEST(custom_bulk_call_operator_called);
HPX_TEST_EQ(custom_bulk_call_count, 3);
}
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes here are not important

Original file line number Diff line number Diff line change
Expand Up @@ -508,15 +508,15 @@ namespace hpx::execution::experimental {
// The sender_of concept defines the requirements for a sender type that on
// successful completion sends the specified set of value types.
template <typename Sender, typename Signal,
typename Env = hpx::execution::experimental::empty_env>
typename Env = hpx::execution::experimental::env<>>
struct is_sender_of
: std::bool_constant<
hpx::execution::experimental::sender_of<Sender, Signal, Env>>
{
};

template <typename Sender, typename Signal,
typename Env = hpx::execution::experimental::empty_env>
typename Env = hpx::execution::experimental::env<>>
inline constexpr bool is_sender_of_v =
is_sender_of<Sender, Signal, Env>::value;

Expand All @@ -533,7 +533,7 @@ namespace hpx::execution::experimental {
// 3. Otherwise, single-sender-value-type<S, E> is ill-formed.
//
template <typename Sender,
typename Env = hpx::execution::experimental::empty_env>
typename Env = hpx::execution::experimental::env<>>
using single_sender_value_t =
hpx::execution::experimental::stdexec_internal::__single_sender_value_t<
Sender, Env>;
Expand Down Expand Up @@ -1008,8 +1008,9 @@ namespace hpx::execution::experimental {
value_or_void_t<Value>, std::exception_ptr>;

template <typename Promise>
using coroutine_env_t = hpx::util::detected_or<exec_envs::empty_env,
hpx::functional::tag_invoke_result_t, get_env_t, Promise>;
using coroutine_env_t =
hpx::util::detected_or<hpx::execution::experimental::empty_env,
hpx::functional::tag_invoke_result_t, get_env_t, Promise>;

template <typename Value>
struct receiver_base
Expand Down
Loading
Loading