Skip to content
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
3e13add
updated algorithm_bulk test file
charan-003 Jul 18, 2025
d62a019
updated thread_pool_bulk
charan-003 Jul 19, 2025
1435011
implementing stdexec bulk in thread pool scheduler bulk v1
charan-003 Aug 2, 2025
e359e65
implementing stdexec bulk in thread pool scheduler bulk v2
charan-003 Aug 2, 2025
5c16588
implementing stdexec bulk in thread pool scheduler bulk v3
charan-003 Aug 2, 2025
b134e03
changing tag invoke to domain
charan-003 Aug 6, 2025
4b1074b
changing tag invoke to domain v2
charan-003 Aug 6, 2025
c3fcbda
changing tag invoke to domain v3
charan-003 Aug 7, 2025
9e6fe4a
implementing stdexec bulk and domain v1
charan-003 Aug 10, 2025
dfedfc8
clean up header dependencies
charan-003 Aug 12, 2025
b12bfb3
HPX thread pool with domain-based customization
charan-003 Aug 12, 2025
cae2eb4
Merge branch 'STEllAR-GROUP:master' into feature/forward-bulk
charan-003 Aug 13, 2025
ff8a35c
fixing minor errrors v1
Aug 13, 2025
e2de597
fixing errors 1
Aug 13, 2025
75409da
fixing minor errrors v2
Aug 13, 2025
e30e63f
fixing minor errrors v2.1
Aug 13, 2025
8c920f3
fixing minor errrors v3
Aug 14, 2025
0448223
keeping back the old version
Aug 15, 2025
cd232b3
keeping back the old version v1
Aug 15, 2025
cc5ea49
keeping back the old version v2
Aug 15, 2025
e3f489e
keeping back the old version v2.1
Aug 15, 2025
806f82c
keeping back the old version v3
Aug 16, 2025
7c8c0d2
fixing sender related errors v1
Aug 16, 2025
e95e281
fixing sender related errors v2
Aug 16, 2025
7febc0f
fixing sender related errors v3
Aug 16, 2025
d512cfb
fixing sender related errors v3.1
Aug 16, 2025
b362a07
fix continue on errors
Aug 16, 2025
89bc8e5
fix continue on errorss
Aug 16, 2025
3f3b523
fix errors
Aug 16, 2025
348ed1f
fix errors-1
Aug 16, 2025
bcba0d6
fixing macos errors
Aug 17, 2025
6ddf196
fixing macos errors thread pool scheduler
Aug 17, 2025
5443436
fix errrors
Aug 17, 2025
c0775b7
erors
Aug 19, 2025
22554b3
going bac
Aug 19, 2025
c88dbe9
fixing format
charan-003 Aug 19, 2025
b31dddb
macos errors
charan-003 Aug 19, 2025
4d1d90c
Merge branch 'master' into feature/forward-bulk
charan-003 Aug 19, 2025
318cafe
macos errors 1
charan-003 Aug 19, 2025
51a378e
t reset --hard HEAD~1
charan-003 Aug 19, 2025
15f19e6
macos errors.
charan-003 Aug 20, 2025
aa26777
macos errors..
charan-003 Aug 20, 2025
555f473
Merge commit 'aa26777503' into feature/forward-bulk
charan-003 Aug 20, 2025
581f49f
macos errors..
charan-003 Aug 20, 2025
03322c0
Merge remote-tracking branch 'upstream/master' into feature/forward-bulk
Aug 27, 2025
54bc90a
new changes
Aug 27, 2025
2472a9c
new changes -1
Aug 27, 2025
d0353fb
new changes 1
Aug 27, 2025
ecd9bb0
minor changes
Sep 1, 2025
61a9ccd
Merge branch 'master' into feature/forward-bulk
charan-003 Sep 1, 2025
85e287e
minor changes
Sep 2, 2025
dc7653c
adding back stdexec execution polocies
Sep 2, 2025
78abc27
typo
Sep 2, 2025
701a695
Half-working changes from Sep-03 meeting
isidorostsa Sep 3, 2025
242b506
Rename transfer to continues on in tps test
isidorostsa Sep 6, 2025
c714324
Comment out test for the currently unimplemented bulk_chunked
isidorostsa Sep 6, 2025
961f198
Revert thread_pool_scheduler to use ranges,
isidorostsa Sep 6, 2025
f1625e4
Use ranges::range_value_t instead of hpx::iter_value<hpx::range_iter.…
isidorostsa Sep 6, 2025
a005ea4
Transform sender feeds an iota range to the thread_pool_bulk_sender f…
isidorostsa Sep 6, 2025
856bf42
Merge branch 'master' into feature/forward-bulk
charan-003 Sep 6, 2025
515faa4
P3481R5 bulk execution compliance in HPX bulk
Sep 9, 2025
0d708d0
P3481R5 bulk execution compliance in HPX bulk .
Sep 9, 2025
6c8aea6
macos errors
Sep 9, 2025
269e2b3
Merge branch 'master' into feature/forward-bulk
charan-003 Sep 10, 2025
77620dd
fix format
Sep 10, 2025
58c6952
Domain Transform Logic
Sep 10, 2025
4d65eeb
Domain Transform Logic fix format
Sep 10, 2025
73ae6fb
macos errrors
Sep 10, 2025
3119ef4
detect bulk as default implementation to bulk chunked
Sep 14, 2025
f3fc6fa
Merge branch 'master' into feature/forward-bulk
charan-003 Sep 14, 2025
6ada3c8
revert back bulk_t
Sep 15, 2025
6a8fb0e
bulk_chunked uses f(start,end,...)
Sep 15, 2025
13b291e
remove bulk_t
Sep 15, 2025
e9eefdd
formatting
Sep 16, 2025
c8e8d9c
formatting fix
Sep 16, 2025
2f9b3b5
formatting fix arrow
Sep 16, 2025
b18a39a
minor fix
Sep 17, 2025
a568202
removing continue on
Sep 17, 2025
36ae7f4
format
Sep 17, 2025
e9685b2
Concept Check set_value got correct F
isidorostsa Sep 18, 2025
ab206b1
Revert "Concept Check set_value got correct F"
isidorostsa Sep 24, 2025
79f731c
old implementation of bulk
Oct 2, 2025
ee410cf
include header
Oct 2, 2025
e38f0a4
Merge branch 'master' into feature/forward-bulk
charan-003 Oct 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 1 addition & 295 deletions libs/core/execution/include/hpx/execution/algorithms/bulk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,300 +11,6 @@

#if defined(HPX_HAVE_STDEXEC)
#include <hpx/execution_base/stdexec_forward.hpp>
#endif

#include <hpx/concepts/concepts.hpp>
#include <hpx/datastructures/tuple.hpp>
#include <hpx/datastructures/variant.hpp>
#include <hpx/errors/try_catch_exception_ptr.hpp>
#include <hpx/execution/algorithms/detail/partial_algorithm.hpp>
#include <hpx/execution/algorithms/then.hpp>
#include <hpx/execution_base/completion_scheduler.hpp>
#include <hpx/execution_base/completion_signatures.hpp>
#include <hpx/execution_base/receiver.hpp>
#include <hpx/execution_base/sender.hpp>
#include <hpx/functional/detail/tag_priority_invoke.hpp>
#include <hpx/functional/invoke_result.hpp>
#include <hpx/iterator_support/counting_shape.hpp>
#include <hpx/type_support/pack.hpp>

#include <exception>
#include <iterator>
#include <type_traits>
#include <utility>

namespace hpx::execution::experimental {

///////////////////////////////////////////////////////////////////////////
namespace detail {

template <typename Sender, typename Shape, typename F>
struct bulk_sender
{
HPX_NO_UNIQUE_ADDRESS std::decay_t<Sender> sender;
HPX_NO_UNIQUE_ADDRESS std::decay_t<Shape> shape;
HPX_NO_UNIQUE_ADDRESS std::decay_t<F> f;

#if defined(HPX_HAVE_STDEXEC)
using sender_concept = hpx::execution::experimental::sender_t;

template <typename... Args>
using default_set_value =
hpx::execution::experimental::completion_signatures<
hpx::execution::experimental::set_value_t(Args...)>;

template <typename Arg>
using default_set_error =
hpx::execution::experimental::completion_signatures<
hpx::execution::experimental::set_error_t(Arg)>;

using disable_set_stopped =
hpx::execution::experimental::completion_signatures<>;

// clang-format off
template <typename Env>
friend auto tag_invoke(get_completion_signatures_t,
bulk_sender const&, Env) noexcept -> hpx::execution::
experimental::transform_completion_signatures_of<Sender, Env,
hpx::execution::experimental::completion_signatures<
hpx::execution::experimental::set_error_t(
std::exception_ptr)>,
default_set_value, default_set_error, disable_set_stopped>;
// clang-format on

friend constexpr auto tag_invoke(
hpx::execution::experimental::get_env_t,
bulk_sender const& s) noexcept
{
return hpx::execution::experimental::get_env(s.sender);
}
#else
using is_sender = void;

template <typename Env>
struct generate_completion_signatures
{
template <template <typename...> typename Tuple,
template <typename...> typename Variant>
using value_types =
value_types_of_t<Sender, Env, Tuple, Variant>;

template <template <typename...> typename Variant>
using error_types = hpx::util::detail::unique_concat_t<
error_types_of_t<Sender, Env, Variant>,
Variant<std::exception_ptr>>;

static constexpr bool sends_stopped = false;
};

// clang-format off
template <typename Env>
friend auto tag_invoke(
get_completion_signatures_t, bulk_sender const&, Env) noexcept
-> generate_completion_signatures<Env>;
// clang-format on

// clang-format off
template <typename CPO,
HPX_CONCEPT_REQUIRES_(
hpx::execution::experimental::detail::is_receiver_cpo_v<CPO> &&
hpx::execution::experimental::detail::has_completion_scheduler_v<
CPO, std::decay_t<Sender>>
)>
// clang-format on
friend constexpr auto tag_invoke(
hpx::execution::experimental::get_completion_scheduler_t<CPO>
tag,
bulk_sender const& s)
{
return tag(s.sender);
}
#endif
template <typename Receiver>
struct bulk_receiver
{
#if defined(HPX_HAVE_STDEXEC)
using receiver_concept =
hpx::execution::experimental::receiver_t;
#endif
HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;
HPX_NO_UNIQUE_ADDRESS std::decay_t<Shape> shape;
HPX_NO_UNIQUE_ADDRESS std::decay_t<F> f;

template <typename Receiver_, typename Shape_, typename F_>
bulk_receiver(Receiver_&& receiver, Shape_&& shape, F_&& f)
: receiver(HPX_FORWARD(Receiver_, receiver))
, shape(HPX_FORWARD(Shape_, shape))
, f(HPX_FORWARD(F_, f))
{
}

template <typename Error>
friend void tag_invoke(
set_error_t, bulk_receiver&& r, Error&& error) noexcept
{
hpx::execution::experimental::set_error(
HPX_MOVE(r.receiver), HPX_FORWARD(Error, error));
}

friend void tag_invoke(
set_stopped_t, bulk_receiver&& r) noexcept
{
hpx::execution::experimental::set_stopped(
HPX_MOVE(r.receiver));
}

template <typename... Ts>
#if defined(HPX_HAVE_STDEXEC)
void set_value(Ts&&... ts) noexcept
#else
void set_value(Ts&&... ts)
#endif
{
hpx::detail::try_catch_exception_ptr(
[&]() {
for (auto const& s : shape)
{
HPX_INVOKE(f, s, ts...);
}
hpx::execution::experimental::set_value(
HPX_MOVE(receiver), HPX_FORWARD(Ts, ts)...);
},
[&](std::exception_ptr ep) {
hpx::execution::experimental::set_error(
HPX_MOVE(receiver), HPX_MOVE(ep));
});
}

template <typename... Ts>
friend auto tag_invoke(
set_value_t, bulk_receiver&& r, Ts&&... ts) noexcept
-> decltype(hpx::execution::experimental::set_value(
std::declval<std::decay_t<Receiver>&&>(),
HPX_FORWARD(Ts, ts)...),
void())
{
// set_value is in a member function only because of a
// compiler bug in GCC 7. When the body of set_value is
// inlined here compilation fails with an internal compiler
// error.
r.set_value(HPX_FORWARD(Ts, ts)...);
}
};

template <typename Receiver>
friend auto tag_invoke(
connect_t, bulk_sender&& s, Receiver&& receiver)
{
return hpx::execution::experimental::connect(HPX_MOVE(s.sender),
bulk_receiver<Receiver>(HPX_FORWARD(Receiver, receiver),
HPX_MOVE(s.shape), HPX_MOVE(s.f)));
}

template <typename Receiver>
friend auto tag_invoke(
connect_t, bulk_sender& s, Receiver&& receiver)
{
return hpx::execution::experimental::connect(s.sender,
bulk_receiver<Receiver>(
HPX_FORWARD(Receiver, receiver), s.shape, s.f));
}
};
} // namespace detail

///////////////////////////////////////////////////////////////////////////
//
// execution::bulk is used to run a task repeatedly for every index in an
// index space.
//
// Returns a sender describing the task of invoking the provided function
// with every index in the provided shape along with the values sent by the
// input sender. The returned sender completes once all invocations have
// completed, or an error has occurred. If it completes by sending values,
// they are equivalent to those sent by the input sender.
//
// No instance of function will begin executing until the returned sender is
// started. Each invocation of function runs in an execution agent whose
// forward progress guarantees are determined by the scheduler on which they
// are run. All agents created by a single use of bulk execute with the same
// guarantee. This allows, for instance, a scheduler to execute all
// invocations of the function in parallel.
//
// The bulk operation is intended to be used at the point where the number
// of agents to be created is known and provided to bulk via its shape
// parameter. For some parallel computations, the number of agents to be
// created may be a function of the input data or dynamic conditions of the
// execution environment. In such cases, bulk can be combined with
// additional operations such as let_value to deliver dynamic shape
// information to the bulk operation.
//
inline constexpr struct bulk_t final
: hpx::functional::detail::tag_priority<bulk_t>
{
private:
// clang-format off
template <typename Sender, typename Shape, typename F,
HPX_CONCEPT_REQUIRES_(
is_sender_v<Sender> &&
experimental::detail::is_completion_scheduler_tag_invocable_v<
hpx::execution::experimental::set_value_t, Sender,
bulk_t, Shape, F
>
)>
// clang-format on
friend constexpr HPX_FORCEINLINE auto tag_override_invoke(
bulk_t, Sender&& sender, Shape const& shape, F&& f)
{
auto scheduler =
hpx::execution::experimental::get_completion_scheduler<
hpx::execution::experimental::set_value_t>(
#if defined(HPX_HAVE_STDEXEC)
hpx::execution::experimental::get_env(sender)
#else
sender
#endif
);

return hpx::functional::tag_invoke(bulk_t{}, HPX_MOVE(scheduler),
HPX_FORWARD(Sender, sender), shape, HPX_FORWARD(F, f));
}

// clang-format off
template <typename Sender, typename Shape, typename F,
HPX_CONCEPT_REQUIRES_(
is_sender_v<Sender> &&
std::is_integral_v<Shape>
)>
// clang-format on
friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
bulk_t, Sender&& sender, Shape const& shape, F&& f)
{
return detail::bulk_sender<Sender, hpx::util::counting_shape<Shape>,
F>{HPX_FORWARD(Sender, sender),
hpx::util::counting_shape(shape), HPX_FORWARD(F, f)};
}

// clang-format off
template <typename Sender, typename Shape, typename F,
HPX_CONCEPT_REQUIRES_(
is_sender_v<Sender> &&
!std::is_integral_v<std::decay_t<Shape>>
)>
// clang-format on
friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
bulk_t, Sender&& sender, Shape&& shape, F&& f)
{
return detail::bulk_sender<Sender, Shape, F>{
HPX_FORWARD(Sender, sender), HPX_FORWARD(Shape, shape),
HPX_FORWARD(F, f)};
}

template <typename Shape, typename F>
friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
bulk_t, Shape&& shape, F&& f)
{
return detail::partial_algorithm<bulk_t, Shape, F>{
HPX_FORWARD(Shape, shape), HPX_FORWARD(F, f)};
}
} bulk{};
} // namespace hpx::execution::experimental
#endif
12 changes: 12 additions & 0 deletions libs/core/execution/tests/unit/algorithm_bulk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ struct custom_bulk_operation
}
};

#if !defined(HPX_HAVE_STDEXEC)
template <typename S>
auto tag_invoke(ex::bulk_t, S&& s, int num, custom_bulk_operation t)
{
t.tag_invoke_overload_called = true;
return ex::bulk(
std::forward<S>(s), num, [t = std::move(t)](int n) { t(n); });
}
#endif

int main()
{
Expand Down Expand Up @@ -284,7 +286,12 @@ int main()
auto os = ex::connect(std::move(s), std::move(r));
ex::start(os);
HPX_TEST(receiver_set_value_called);
#if defined(HPX_HAVE_STDEXEC)
// stdexec doesn't use tag_invoke for bulk customization
HPX_TEST(!tag_invoke_overload_called);
#else
HPX_TEST(tag_invoke_overload_called);
#endif
HPX_TEST(custom_bulk_call_operator_called);
HPX_TEST_EQ(custom_bulk_call_count, 10);
}
Expand Down Expand Up @@ -395,7 +402,12 @@ int main()
auto os = ex::connect(std::move(s), std::move(r));
ex::start(os);
HPX_TEST(receiver_set_error_called);
#if defined(HPX_HAVE_STDEXEC)
// stdexec doesn't use tag_invoke for bulk customization
HPX_TEST(!tag_invoke_overload_called);
#else
HPX_TEST(tag_invoke_overload_called);
#endif
HPX_TEST(custom_bulk_call_operator_called);
HPX_TEST_EQ(custom_bulk_call_count, 3);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,23 @@ namespace hpx::execution::experimental {
using stdexec::transfer_just;
using stdexec::transfer_just_t;

// Bulk (NOT FORWARDED)
// using stdexec::bulk_t;
// using stdexec::bulk;
// Bulk
using stdexec::bulk;
using stdexec::bulk_chunked;
using stdexec::bulk_chunked_t;
using stdexec::bulk_t;
using stdexec::bulk_unchunked;
using stdexec::bulk_unchunked_t;

// Execution policies (required for stdexec bulk)
using stdexec::par;
using stdexec::par_unseq;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add those to a namespace simillar to what we did with "stdexec_non_standard_tag_inoke" below?

using stdexec::parallel_policy;
using stdexec::parallel_unsequenced_policy;
using stdexec::seq;
using stdexec::sequenced_policy;
using stdexec::unseq;
using stdexec::unsequenced_policy;

// Split
using stdexec::split;
Expand Down
Loading
Loading