Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include <hpx/execution_base/operation_state.hpp>
#include <hpx/execution_base/receiver.hpp>
#include <hpx/execution_base/sender.hpp>
#include <hpx/functional/tag_invoke.hpp>
#include <hpx/futures/detail/future_data.hpp>
#include <hpx/futures/future.hpp>
#include <hpx/futures/traits/acquire_shared_state.hpp>
Expand All @@ -25,7 +24,7 @@
#include <utility>

namespace hpx::execution::experimental {

namespace hpxexp = hpx::execution::experimental;
namespace detail {

///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -54,10 +53,9 @@ namespace hpx::execution::experimental {
as_sender_operation_state& operator=(
as_sender_operation_state const&) = delete;

friend void tag_invoke(hpx::execution::experimental::start_t,
as_sender_operation_state& os) noexcept
void start() & noexcept
{
os.start_helper();
start_helper();
}

private:
Expand All @@ -79,19 +77,17 @@ namespace hpx::execution::experimental {
{
if constexpr (std::is_void_v<result_type>)
{
hpx::execution::experimental::set_value(
HPX_MOVE(receiver_));
hpxexp::set_value(HPX_MOVE(receiver_));
}
else
{
hpx::execution::experimental::set_value(
hpxexp::set_value(
HPX_MOVE(receiver_), future_.get());
}
}
else if (future_.has_exception())
{
hpx::execution::experimental::set_error(
HPX_MOVE(receiver_),
hpxexp::set_error(HPX_MOVE(receiver_),
future_.get_exception_ptr());
}
};
Expand Down Expand Up @@ -120,8 +116,7 @@ namespace hpx::execution::experimental {
}
},
[&](std::exception_ptr ep) {
hpx::execution::experimental::set_error(
HPX_MOVE(receiver_), HPX_MOVE(ep));
hpxexp::set_error(HPX_MOVE(receiver_), HPX_MOVE(ep));
});
}

Expand All @@ -140,22 +135,29 @@ namespace hpx::execution::experimental {
template <bool IsVoid, typename _result_type>
struct set_value_void_checked
{
using type = hpx::execution::experimental::set_value_t(
_result_type);
using type = hpxexp::set_value_t(_result_type);
};

template <typename _result_type>
struct set_value_void_checked<true, _result_type>
{
using type = hpx::execution::experimental::set_value_t();
using type = hpxexp::set_value_t();
};

using completion_signatures =
hpx::execution::experimental::completion_signatures<
typename set_value_void_checked<std::is_void_v<result_type>,
result_type>::type,
hpx::execution::experimental::set_error_t(
std::exception_ptr)>;
private:
using _completion_signatures = hpxexp::completion_signatures<
typename set_value_void_checked<std::is_void_v<result_type>,
result_type>::type,
hpxexp::set_error_t(std::exception_ptr)>;

public:
// According to P3557R2
template <typename Self>
static constexpr auto get_completion_signatures()
{
return _completion_signatures();
}

#else
// Sender compatibility
template <typename, typename T>
Expand Down Expand Up @@ -210,11 +212,13 @@ namespace hpx::execution::experimental {
as_sender_sender(as_sender_sender const&) = delete;
as_sender_sender& operator=(as_sender_sender const&) = delete;

// This is not a shared future, so this is only callable with u
// rvalues
template <typename Receiver>
friend as_sender_operation_state<Receiver, future_type> tag_invoke(
connect_t, as_sender_sender&& s, Receiver&& receiver)
as_sender_operation_state<Receiver, future_type> connect(
Receiver&& receiver) &&
{
return {HPX_FORWARD(Receiver, receiver), HPX_MOVE(s.future_)};
return {HPX_FORWARD(Receiver, receiver), HPX_MOVE(future_)};
}
};

Expand All @@ -241,17 +245,17 @@ namespace hpx::execution::experimental {
as_sender_sender& operator=(as_sender_sender const&) = default;

template <typename Receiver>
friend as_sender_operation_state<Receiver, future_type> tag_invoke(
connect_t, as_sender_sender&& s, Receiver&& receiver)
as_sender_operation_state<Receiver, future_type> connect(
Receiver&& receiver) &&
{
return {HPX_FORWARD(Receiver, receiver), HPX_MOVE(s.future_)};
return {HPX_FORWARD(Receiver, receiver), HPX_MOVE(future_)};
}

template <typename Receiver>
friend as_sender_operation_state<Receiver, future_type> tag_invoke(
connect_t, as_sender_sender& s, Receiver&& receiver)
as_sender_operation_state<Receiver, future_type> connect(
Receiver&& receiver) &
{
return {HPX_FORWARD(Receiver, receiver), s.future_};
return {HPX_FORWARD(Receiver, receiver), future_};
}
};
} // namespace detail
Expand Down
54 changes: 23 additions & 31 deletions libs/core/execution/include/hpx/execution/algorithms/bulk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <utility>

namespace hpx::execution::experimental {
namespace hpxexp = hpx::execution::experimental;

///////////////////////////////////////////////////////////////////////////
namespace detail {
Expand All @@ -46,37 +47,33 @@ namespace hpx::execution::experimental {
HPX_NO_UNIQUE_ADDRESS std::decay_t<F> f;

#if defined(HPX_HAVE_STDEXEC)
using sender_concept = hpx::execution::experimental::sender_t;
using sender_concept = hpxexp::sender_t;

template <typename... Args>
using default_set_value =
hpx::execution::experimental::completion_signatures<
hpx::execution::experimental::set_value_t(Args...)>;
hpxexp::completion_signatures<hpxexp::set_value_t(Args...)>;

template <typename Arg>
using default_set_error =
hpx::execution::experimental::completion_signatures<
hpx::execution::experimental::set_error_t(Arg)>;
hpxexp::completion_signatures<hpxexp::set_error_t(Arg)>;

using disable_set_stopped =
hpx::execution::experimental::completion_signatures<>;
using disable_set_stopped = hpxexp::completion_signatures<>;

// clang-format off
template <typename Env>
friend auto tag_invoke(get_completion_signatures_t,
bulk_sender const&, Env) noexcept -> hpx::execution::
experimental::transform_completion_signatures_of<Sender, Env,
hpx::execution::experimental::completion_signatures<
hpx::execution::experimental::set_error_t(
hpxexp::completion_signatures<
hpxexp::set_error_t(
std::exception_ptr)>,
default_set_value, default_set_error, disable_set_stopped>;
// clang-format on

friend constexpr auto tag_invoke(
hpx::execution::experimental::get_env_t,
bulk_sender const& s) noexcept
hpxexp::get_env_t, bulk_sender const& s) noexcept
{
return hpx::execution::experimental::get_env(s.sender);
return hpxexp::get_env(s.sender);
}
#else
using is_sender = void;
Expand Down Expand Up @@ -107,14 +104,13 @@ namespace hpx::execution::experimental {
// clang-format off
template <typename CPO,
HPX_CONCEPT_REQUIRES_(
hpx::execution::experimental::detail::is_receiver_cpo_v<CPO> &&
hpx::execution::experimental::detail::has_completion_scheduler_v<
hpxexp::detail::is_receiver_cpo_v<CPO> &&
hpxexp::detail::has_completion_scheduler_v<
CPO, std::decay_t<Sender>>
)>
// clang-format on
friend constexpr auto tag_invoke(
hpx::execution::experimental::get_completion_scheduler_t<CPO>
tag,
hpxexp::get_completion_scheduler_t<CPO> tag,
bulk_sender const& s)
{
return tag(s.sender);
Expand All @@ -124,8 +120,7 @@ namespace hpx::execution::experimental {
struct bulk_receiver
{
#if defined(HPX_HAVE_STDEXEC)
using receiver_concept =
hpx::execution::experimental::receiver_t;
using receiver_concept = hpxexp::receiver_t;
#endif
HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;
HPX_NO_UNIQUE_ADDRESS std::decay_t<Shape> shape;
Expand All @@ -143,15 +138,14 @@ namespace hpx::execution::experimental {
friend void tag_invoke(
set_error_t, bulk_receiver&& r, Error&& error) noexcept
{
hpx::execution::experimental::set_error(
hpxexp::set_error(
HPX_MOVE(r.receiver), HPX_FORWARD(Error, error));
}

friend void tag_invoke(
set_stopped_t, bulk_receiver&& r) noexcept
{
hpx::execution::experimental::set_stopped(
HPX_MOVE(r.receiver));
hpxexp::set_stopped(HPX_MOVE(r.receiver));
}

template <typename... Ts>
Expand All @@ -167,19 +161,18 @@ namespace hpx::execution::experimental {
{
HPX_INVOKE(f, s, ts...);
}
hpx::execution::experimental::set_value(
hpxexp::set_value(
HPX_MOVE(receiver), HPX_FORWARD(Ts, ts)...);
},
[&](std::exception_ptr ep) {
hpx::execution::experimental::set_error(
HPX_MOVE(receiver), HPX_MOVE(ep));
hpxexp::set_error(HPX_MOVE(receiver), HPX_MOVE(ep));
});
}

template <typename... Ts>
friend auto tag_invoke(
set_value_t, bulk_receiver&& r, Ts&&... ts) noexcept
-> decltype(hpx::execution::experimental::set_value(
-> decltype(hpxexp::set_value(
std::declval<std::decay_t<Receiver>&&>(),
HPX_FORWARD(Ts, ts)...),
void())
Expand All @@ -196,7 +189,7 @@ namespace hpx::execution::experimental {
friend auto tag_invoke(
connect_t, bulk_sender&& s, Receiver&& receiver)
{
return hpx::execution::experimental::connect(HPX_MOVE(s.sender),
return hpxexp::connect(HPX_MOVE(s.sender),
bulk_receiver<Receiver>(HPX_FORWARD(Receiver, receiver),
HPX_MOVE(s.shape), HPX_MOVE(s.f)));
}
Expand All @@ -205,7 +198,7 @@ namespace hpx::execution::experimental {
friend auto tag_invoke(
connect_t, bulk_sender& s, Receiver&& receiver)
{
return hpx::execution::experimental::connect(s.sender,
return hpxexp::connect(s.sender,
bulk_receiver<Receiver>(
HPX_FORWARD(Receiver, receiver), s.shape, s.f));
}
Expand Down Expand Up @@ -247,7 +240,7 @@ namespace hpx::execution::experimental {
HPX_CONCEPT_REQUIRES_(
is_sender_v<Sender> &&
experimental::detail::is_completion_scheduler_tag_invocable_v<
hpx::execution::experimental::set_value_t, Sender,
hpxexp::set_value_t, Sender,
bulk_t, Shape, F
>
)>
Expand All @@ -256,10 +249,9 @@ namespace hpx::execution::experimental {
bulk_t, Sender&& sender, Shape const& shape, F&& f)
{
auto scheduler =
hpx::execution::experimental::get_completion_scheduler<
hpx::execution::experimental::set_value_t>(
hpxexp::get_completion_scheduler<hpxexp::set_value_t>(
#if defined(HPX_HAVE_STDEXEC)
hpx::execution::experimental::get_env(sender)
hpxexp::get_env(sender)
#else
sender
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <utility>

namespace hpx::execution::experimental {
namespace hpxexp = hpx::execution::experimental;

namespace detail {

Expand Down Expand Up @@ -51,13 +52,12 @@ namespace hpx::execution::experimental {
// to move receiver and future into the on_completed
// callback.
state->set_on_completed([&os]() mutable {
hpx::execution::experimental::set_value(
hpxexp::set_value(
HPX_MOVE(os.receiver), HPX_MOVE(os.future));
});
},
[&](std::exception_ptr ep) {
hpx::execution::experimental::set_error(
HPX_MOVE(os.receiver), HPX_MOVE(ep));
hpxexp::set_error(HPX_MOVE(os.receiver), HPX_MOVE(ep));
});
}
};
Expand All @@ -68,11 +68,9 @@ namespace hpx::execution::experimental {
std::decay_t<Future> future;
#if defined(HPX_HAVE_STDEXEC)
using completion_signatures =
hpx::execution::experimental::completion_signatures<
hpx::execution::experimental::set_value_t(
std::decay_t<Future>),
hpx::execution::experimental::set_error_t(
std::exception_ptr)>;
hpxexp::completion_signatures<hpxexp::set_value_t(
std::decay_t<Future>),
hpxexp::set_error_t(std::exception_ptr)>;
#else
struct completion_signatures
{
Expand Down
Loading
Loading