Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions libs/full/collectives/include/hpx/collectives/broadcast.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,79 @@ namespace hpx::collectives {
this_site, generation, root_site),
this_site);
}

template <typename T>
hpx::future<std::decay_t<T>> broadcast_to_hierarchically(
std::vector<std::tuple<communicator,int>> communicators,
T&& local_result,
this_site_arg this_site = this_site_arg(),
generation_arg generation = generation_arg(),
root_site_arg root_site = root_site_arg(),
int arity = 2)
{
if (this_site == static_cast<std::size_t>(-1))
{
this_site = agas::get_locality_id();
}
if (generation == 0)
{
return hpx::make_exceptional_future<T>(HPX_GET_EXCEPTION(
hpx::error::bad_parameter, "hpx::collectives::scatter_to",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name needs to be corrected

"the generation number shouldn't be zero"));
}

communicator current_communicator = std::get<0>(communicators[0]);
int current_site = std::get<1>(communicators[0]);
if (this_site == root_site)
{
T current_local_result = std::move(local_result);
for (int i = 0; i < communicators.size()-1;i++)
{
current_communicator = std::get<0>(communicators[i]);
current_local_result = broadcast_to(current_communicator, std::move(current_local_result), generation, this_site_arg(0)).get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do have a synchronous overload of broadcast. Please consider using it instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use HPX_MOVE instead of std::move for HPX implementation files (not tests or examples, though).

}
current_communicator = std::get<0>(communicators[communicators.size()-1]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
current_communicator = std::get<0>(communicators[communicators.size()-1]);
current_communicator = std::get<0>(communicators.back());

return broadcast_to(current_communicator, std::move(current_local_result), generation, this_site_arg(0));
}
}

template <typename T>
hpx::future<T> broadcast_from_hierarchically(
std::vector<std::tuple<communicator,int>> communicators,
this_site_arg this_site = this_site_arg(),
generation_arg generation = generation_arg(),
root_site_arg root_site = root_site_arg(),
int arity = 2)
{
if (this_site == static_cast<std::size_t>(-1))
{
this_site = agas::get_locality_id();
}
if (generation == 0)
{
return hpx::make_exceptional_future<T>(HPX_GET_EXCEPTION(
hpx::error::bad_parameter, "hpx::collectives::scatter_to",
"the generation number shouldn't be zero"));
}

communicator current_communicator = std::get<0>(communicators[0]);
int current_site = std::get<1>(communicators[0]);
Comment on lines +450 to +451
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
communicator current_communicator = std::get<0>(communicators[0]);
int current_site = std::get<1>(communicators[0]);
auto [current_communicator, current_site] = communicators[0];

if (this_site != root_site && communicators.size()>1)
{
T current_local_result = broadcast_from<T>(current_communicator, generation, this_site_arg(current_site)).get();
for (int i = 1; i < communicators.size()-1; i++)
{
current_communicator = std::get<0>(communicators[i]);
current_local_result = broadcast_to(current_communicator, std::move(current_local_result), generation, this_site_arg(0)).get();
}
current_communicator = std::get<0>(communicators[communicators.size()-1]);
return broadcast_to(current_communicator, std::move(current_local_result), generation, this_site_arg(0));
}
else if (this_site != root_site )
{
return broadcast_from<T>(current_communicator, generation, this_site_arg(current_site));
}
}
} // namespace hpx::collectives

////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,39 @@ namespace hpx { namespace collectives {
num_sites_arg num_sites, this_site_arg this_site,
generation_arg generation = generation_arg(),
root_site_arg root_site = root_site_arg());

/// Create a new communicator object usable with any collective operation
///
/// This functions creates a new communicator object that can be called in
/// order to pre-allocate a communicator object usable with multiple
/// invocations of any of the collective operations (such as \a all_gather,
/// \a all_reduce, \a all_to_all, \a broadcast, etc.).
///
/// \param basename The base name identifying the collective operation
/// \param num_sites The number of participating sites (default: all
/// localities).
/// \param this_site The sequence number of this invocation (usually
/// the locality id). This value is optional and
/// defaults to whatever hpx::get_locality_id() returns.
/// \param generation The generational counter identifying the sequence
/// number of the collective operation performed on the
/// given base name. This is optional and needs to be
/// supplied only if the collective operation on the
/// given base name has to be performed more than once.
/// \param root_site The site that is responsible for creating the
/// collective support object. This value is optional
/// and defaults to '0' (zero).
///
/// \returns This function returns a new communicator object usable
/// with the collective operation.
///
communicator create_hierarchical_communicator(char const* basename,
num_sites_arg num_sites = num_sites_arg(),
this_site_arg this_site = this_site_arg(),
generation_arg generation = generation_arg(),
root_site_arg root_site = root_site_arg(),
arity_arg arity = arity_arg());

}}
// clang-format on

Expand Down Expand Up @@ -197,6 +230,14 @@ namespace hpx::collectives {
generation_arg generation = generation_arg(),
root_site_arg root_site = root_site_arg());

HPX_EXPORT std::vector<std::tuple<communicator,int>> create_hierarchical_communicator(char const* basename,
num_sites_arg num_sites = num_sites_arg(),
this_site_arg this_site = this_site_arg(),
generation_arg generation = generation_arg(),
root_site_arg root_site = root_site_arg(),
int arity = 4);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please introduce a new tag type for the arity.

std::vector<std::tuple<communicator,int>> recursively_fill_communicators(std::vector<std::tuple<communicator,int>> communicators, int left, int right, std::string basename, int arity, int max_depth, int this_site, int num_sites, generation_arg generation);

} // namespace hpx::collectives

#endif // !HPX_COMPUTE_DEVICE_CODE
Expand Down
108 changes: 108 additions & 0 deletions libs/full/collectives/include/hpx/collectives/gather.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,114 @@ namespace hpx::collectives {
this_site, generation, root_site),
HPX_FORWARD(T, local_result), this_site);
}

template <typename T>
std::vector<T> flatten_vector(communicator fid, std::vector<std::vector<T>>&& dimensional_vector)
{
std::vector<std::vector<T>> non_flat_vector = std::move(dimensional_vector);
std::vector<T> current_local_result;
size_t totalSize = 0;
for (const auto& row : non_flat_vector) {
totalSize += row.size();
}
current_local_result.reserve(totalSize);
for (auto& row : non_flat_vector) {
current_local_result.insert(current_local_result.end(), std::make_move_iterator(row.begin()), std::make_move_iterator(row.end()));
}
return std::move(current_local_result);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never use std::move on a local variable being returned from a function. This disables RVO in the compiler.

}


template <typename T>
std::vector<T> gather_here_hierarchically(
std::vector<std::tuple<communicator,int>> communicators,
T&& local_result,
this_site_arg this_site = this_site_arg(),
generation_arg generation = generation_arg(),
root_site_arg root_site = root_site_arg(),
int arity = 2)
{
if (this_site == static_cast<std::size_t>(-1))
{
this_site = agas::get_locality_id();
}
/* if (generation == 0)
{
return hpx::make_exceptional_future<T>(HPX_GET_EXCEPTION(
hpx::error::bad_parameter, "hpx::collectives::scatter_to",
"the generation number shouldn't be zero"));
} */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove code that has been commented out.


communicator current_communicator = std::get<0>(communicators[0]);
int current_site = std::get<1>(communicators[0]);
if (this_site == root_site)
{
std::vector<T> current_local_result;
current_local_result.push_back(std::move(local_result));
for (int i = communicators.size()-1; i > 0;i--)
{
current_communicator = std::get<0>(communicators[i]);
std::vector<std::vector<T>> in_between_result = gather_here(current_communicator, std::move(current_local_result), generation, this_site_arg(0)).get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do have a synchronous version of gather. Please consider using it instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving from current_local_result repeatedly is not what you want.

size_t totalSize = 0;
for (const auto& row : in_between_result) {
totalSize += row.size();
}
current_local_result.reserve(totalSize);
for (auto& row : in_between_result) {
current_local_result.insert(current_local_result.end(), std::make_move_iterator(row.begin()), std::make_move_iterator(row.end()));
}
}
current_communicator = std::get<0>(communicators[0]);
hpx::future<std::vector<std::vector<std::decay_t<T>>>> dimensional_vector = gather_here(current_communicator, std::move(current_local_result), generation, this_site_arg(0));
return flatten_vector<T>(current_communicator, std::move(dimensional_vector.get()));
}
}



template <typename T>
hpx::future<void> gather_there_hierarchically(
std::vector<std::tuple<communicator,int>> communicators,
T&& local_result,
this_site_arg this_site = this_site_arg(),
generation_arg generation = generation_arg(),
root_site_arg root_site = root_site_arg(),
int arity = 2)
{
if (this_site == static_cast<std::size_t>(-1))
{
this_site = agas::get_locality_id();
}
if (generation == 0)
{
return hpx::make_exceptional_future<T>(HPX_GET_EXCEPTION(
hpx::error::bad_parameter, "hpx::collectives::scatter_to",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name needs to be corrected.

"the generation number shouldn't be zero"));
}

communicator current_communicator = std::get<0>(communicators[0]);
int current_site = std::get<1>(communicators[0]);
if (this_site != root_site)
{
std::vector<T> current_local_result;
current_local_result.push_back(std::move(local_result));
for (int i = communicators.size()-1; i > 0;i--)
{
current_communicator = std::get<0>(communicators[i]);
std::vector<std::vector<T>> in_between_result = gather_here(current_communicator, std::move(current_local_result), generation, this_site_arg(0)).get();
size_t totalSize = 0;
for (const auto& row : in_between_result) {
totalSize += row.size();
}
current_local_result.reserve(totalSize);
for (auto& row : in_between_result) {
current_local_result.insert(current_local_result.end(), std::make_move_iterator(row.begin()), std::make_move_iterator(row.end()));
}
}
current_communicator = std::get<0>(communicators[0]);
return gather_there(current_communicator, std::move(current_local_result), generation, this_site_arg(current_site));
}
}
} // namespace hpx::collectives

///////////////////////////////////////////////////////////////////////////////
Expand Down
72 changes: 72 additions & 0 deletions libs/full/collectives/include/hpx/collectives/reduce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,78 @@ namespace hpx::collectives {
this_site, generation, root_site),
HPX_FORWARD(T, local_result), this_site);
}

template <typename T, typename F>
hpx::future<std::decay_t<T>> reduce_hierarchically_here(
std::vector<std::tuple<communicator,int>> communicators,
T&& local_result,
F&& op,
this_site_arg this_site = this_site_arg(),
generation_arg generation = generation_arg(),
root_site_arg root_site = root_site_arg(),
int arity = 2)
{
if (this_site == static_cast<std::size_t>(-1))
{
this_site = agas::get_locality_id();
}
if (generation == 0)
{
return hpx::make_exceptional_future<T>(HPX_GET_EXCEPTION(
hpx::error::bad_parameter, "hpx::collectives::scatter_to",
"the generation number shouldn't be zero"));
}

communicator current_communicator = std::get<0>(communicators[0]);
int current_site = std::get<1>(communicators[0]);
if (this_site == root_site)
{
T current_local_result = std::move(local_result);
for (int i = communicators.size()-1; i > 0;i--)
{
current_communicator = std::get<0>(communicators[i]);
current_local_result = reduce_here(current_communicator, std::move(current_local_result), std::forward<F>(op), generation, this_site_arg(0)).get();
}
current_communicator = std::get<0>(communicators[0]);
return reduce_here(current_communicator, std::move(current_local_result), std::forward<F>(op), generation, this_site_arg(0));
}
}

template <typename T, typename F>
hpx::future<void> reduce_hierarchically_there(
std::vector<std::tuple<communicator,int>> communicators,
T&& local_result,
F&& op,
this_site_arg this_site = this_site_arg(),
generation_arg generation = generation_arg(),
root_site_arg root_site = root_site_arg(),
int arity = 2)
{
if (this_site == static_cast<std::size_t>(-1))
{
this_site = agas::get_locality_id();
}
if (generation == 0)
{
return hpx::make_exceptional_future<T>(HPX_GET_EXCEPTION(
hpx::error::bad_parameter, "hpx::collectives::scatter_to",
"the generation number shouldn't be zero"));
}

communicator current_communicator = std::get<0>(communicators[0]);
int current_site = std::get<1>(communicators[0]);
if (this_site != root_site)
{
T current_local_result = std::move(local_result);
for (int i = communicators.size()-1; i > 0;i--)
{
current_communicator = std::get<0>(communicators[i]);
current_local_result = reduce_here(current_communicator, std::move(current_local_result), std::forward<F>(op), generation, this_site_arg(0)).get();
}
current_communicator = std::get<0>(communicators[0]);
return reduce_there(current_communicator, std::move(current_local_result), generation, this_site_arg(current_site));
}
}
} // namespace hpx::collectives

#endif // !HPX_COMPUTE_DEVICE_CODE
Expand Down
Loading