Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ namespace hpx {
#include <hpx/execution/algorithms/detail/predicates.hpp>
#include <hpx/execution/executors/execution.hpp>
#include <hpx/execution/executors/execution_parameters.hpp>
#include <hpx/execution_base/stdexec_forward.hpp>
#include <hpx/executors/exception_list.hpp>
#include <hpx/executors/execution_policy.hpp>
#include <hpx/functional/invoke.hpp>
Expand Down Expand Up @@ -531,26 +532,43 @@ namespace hpx::parallel {

template <typename ExPolicy, typename Iter, typename Sent,
typename Comp, typename Proj>
static util::detail::algorithm_result_t<ExPolicy, Iter> parallel(
ExPolicy&& policy, Iter first, Iter middle, Sent last, Comp&& comp,
Proj&& proj)
static decltype(auto) parallel(ExPolicy&& policy, Iter first,
Iter middle, Sent last, Comp&& comp, Proj&& proj)
{
using algorithm_result =
util::detail::algorithm_result<ExPolicy, Iter>;
constexpr bool has_scheduler_executor =
hpx::execution_policy_has_scheduler_executor_v<ExPolicy>;

try
if constexpr (has_scheduler_executor)
{
// call the sort routine and return the right type,
// depending on execution policy
return algorithm_result::get(parallel_partial_sort(
HPX_FORWARD(ExPolicy, policy), first, middle, last,
util::compare_projected<Comp&, Proj&>(comp, proj)));
namespace ex = hpx::execution::experimental;
return ex::just(first, middle, last) |
ex::then([comp = HPX_FORWARD(Comp, comp),
proj = HPX_FORWARD(Proj, proj)](
Iter first, Iter middle, Iter last) -> Iter {
return sequential_partial_sort(first, middle, last,
util::compare_projected<std::decay_t<Comp>,
std::decay_t<Proj>>(comp, proj));
});
}
catch (...)
else
{
return algorithm_result::get(
detail::handle_exception<ExPolicy, Iter>::call(
std::current_exception()));
using algorithm_result =
util::detail::algorithm_result<ExPolicy, Iter>;

try
{
// call the sort routine and return the right type,
// depending on execution policy
return algorithm_result::get(parallel_partial_sort(
HPX_FORWARD(ExPolicy, policy), first, middle, last,
util::compare_projected<Comp&, Proj&>(comp, proj)));
}
catch (...)
{
return algorithm_result::get(
detail::handle_exception<ExPolicy, Iter>::call(
std::current_exception()));
}
}
}
};
Expand Down Expand Up @@ -595,9 +613,9 @@ namespace hpx {
>
)>
// clang-format on
friend parallel::util::detail::algorithm_result_t<ExPolicy, RandIter>
tag_fallback_invoke(hpx::partial_sort_t, ExPolicy&& policy,
RandIter first, RandIter middle, RandIter last, Comp comp = Comp())
friend decltype(auto) tag_fallback_invoke(hpx::partial_sort_t,
ExPolicy&& policy, RandIter first, RandIter middle, RandIter last,
Comp comp = Comp())
{
static_assert(hpx::traits::is_random_access_iterator_v<RandIter>,
"Requires at least random access iterator.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ if(HPX_WITH_STDEXEC)
mismatch_binary_sender
move_sender
none_of_sender
partial_sort_sender
reduce_sender
remove_sender
remove_if_sender
Expand Down
117 changes: 117 additions & 0 deletions libs/core/algorithms/tests/unit/algorithms/partial_sort_sender.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright (c) 2024 Tobias Wukovitsch
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/algorithm.hpp>
#include <hpx/execution.hpp>
#include <hpx/init.hpp>
#include <hpx/modules/testing.hpp>

#include <algorithm>
#include <cstdint>
#include <iostream>
#include <random>
#include <string>
#include <vector>

#include "test_utils.hpp"

////////////////////////////////////////////////////////////////////////////
unsigned int seed = std::random_device{}();
std::mt19937 gen(seed);
constexpr std::uint64_t SIZE{1007};

template <typename LnPolicy, typename ExPolicy, typename IteratorTag>
void test_partial_sort_sender(
LnPolicy ln_policy, ExPolicy&& ex_policy, IteratorTag)
{
static_assert(hpx::is_async_execution_policy_v<ExPolicy>,
"hpx::is_async_execution_policy_v<ExPolicy>");

using compare_t = std::less<std::uint64_t>;
using base_iterator = std::vector<std::uint64_t>::iterator;
using iterator = test::test_iterator<base_iterator, IteratorTag>;

namespace ex = hpx::execution::experimental;
namespace tt = hpx::this_thread::experimental;
using scheduler_t = ex::thread_pool_policy_scheduler<LnPolicy>;

std::vector<std::uint64_t> A, B;
A.reserve(SIZE);
B.reserve(SIZE);

for (std::uint64_t i = 0; i < SIZE; ++i)
{
A.emplace_back(i);
}
std::shuffle(A.begin(), A.end(), gen);

for (std::uint64_t i = 1; i < SIZE; ++i)
{
B = A;

auto exec = ex::explicit_scheduler_executor(scheduler_t(ln_policy));

tt::sync_wait(
ex::just(iterator(std::begin(B)), iterator(std::begin(B) + i),
iterator(std::end(B)), compare_t{}) |
hpx::partial_sort(ex_policy.on(exec)));

for (std::uint64_t j = 0; j < i; ++j)
{
HPX_TEST(B[j] == j);
}
}
}

template <typename IteratorTag>
void partial_sort_sender_test()
{
using namespace hpx::execution;
test_partial_sort_sender(hpx::launch::sync, seq(task), IteratorTag());
test_partial_sort_sender(hpx::launch::sync, unseq(task), IteratorTag());

test_partial_sort_sender(hpx::launch::async, par(task), IteratorTag());
test_partial_sort_sender(
hpx::launch::async, par_unseq(task), IteratorTag());
}

int hpx_main(hpx::program_options::variables_map& vm)
{
unsigned int seed = (unsigned int) std::time(nullptr);
if (vm.count("seed"))
seed = vm["seed"].as<unsigned int>();

std::cout << "using seed: " << seed << std::endl;
std::srand(seed);

partial_sort_sender_test<std::random_access_iterator_tag>();

return hpx::local::finalize();
}

int main(int argc, char* argv[])
{
// add command line option which controls the random number generator seed
using namespace hpx::program_options;
options_description desc_commandline(
"Usage: " HPX_APPLICATION_STRING " [options]");

desc_commandline.add_options()("seed,s", value<unsigned int>(),
"the random number generator seed to use for this run");

// By default this test should run on all available cores
std::vector<std::string> const cfg = {"hpx.os_threads=all"};

// Initialize and run HPX
hpx::local::init_params init_args;
init_args.desc_cmdline = desc_commandline;
init_args.cfg = cfg;

HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv, init_args), 0,
"HPX main exited with non-zero status");

return hpx::util::report_errors();
}
Loading