Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions paddle/fluid/distributed/collective/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

namespace paddle::distributed {

std::vector<Place> GetPlaceList(const std::vector<phi::DenseTensor>& tensors) {
std::vector<Place> GetPlaceList(const std::vector<DenseTensor>& tensors) {
std::vector<Place> places;
places.reserve(tensors.size());
for (auto& tensor : tensors) {
Expand All @@ -41,14 +41,14 @@ std::string GetKeyFromPlaces(const std::vector<Place>& places) {

std::string GetKeyFromPlace(const Place& place) { return place.DebugString(); }

bool CheckTensorsInCudaPlace(const std::vector<phi::DenseTensor>& tensors) {
bool CheckTensorsInCudaPlace(const std::vector<DenseTensor>& tensors) {
return std::all_of(
tensors.cbegin(), tensors.cend(), [&](const phi::DenseTensor& t) {
return phi::is_gpu_place(t.place());
});
}

bool CheckTensorsInCustomPlace(const std::vector<phi::DenseTensor>& tensors,
bool CheckTensorsInCustomPlace(const std::vector<DenseTensor>& tensors,
const std::string& dev_type) {
return std::all_of(
tensors.cbegin(), tensors.cend(), [&](const phi::DenseTensor& t) {
Expand All @@ -57,7 +57,7 @@ bool CheckTensorsInCustomPlace(const std::vector<phi::DenseTensor>& tensors,
});
}

bool CheckTensorsInXPUPlace(const std::vector<phi::DenseTensor>& tensors) {
bool CheckTensorsInXPUPlace(const std::vector<DenseTensor>& tensors) {
return std::all_of(
tensors.cbegin(), tensors.cend(), [&](const phi::DenseTensor& t) {
return phi::is_xpu_place(t.place());
Expand Down
8 changes: 4 additions & 4 deletions paddle/fluid/distributed/collective/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ namespace distributed {

using Place = phi::Place;
// Get the list of devices from list of tensors
std::vector<Place> GetPlaceList(const std::vector<phi::DenseTensor>& tensors);
std::vector<Place> GetPlaceList(const std::vector<DenseTensor>& tensors);
// Get the deviceList String from the list of devices
std::string GetKeyFromPlaces(const std::vector<Place>& places);
// Get the device string from one device
std::string GetKeyFromPlace(const Place& place);

bool CheckTensorsInCudaPlace(const std::vector<phi::DenseTensor>& tensors);
bool CheckTensorsInCudaPlace(const std::vector<DenseTensor>& tensors);

bool CheckTensorsInCustomPlace(const std::vector<phi::DenseTensor>& tensors,
bool CheckTensorsInCustomPlace(const std::vector<DenseTensor>& tensors,
const std::string& dev_type);

bool CheckTensorsInXPUPlace(const std::vector<phi::DenseTensor>& tensors);
bool CheckTensorsInXPUPlace(const std::vector<DenseTensor>& tensors);

} // namespace distributed
} // namespace paddle
2 changes: 1 addition & 1 deletion paddle/fluid/distributed/collective/mpi_tools.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ MPI_Op ToMPIType(ReduceOp reduction) {
// NOTE: MPI does not support CUDA aware now.
bool CheckMpiCudaAware() { return false; }

void CheckValidInputs(const std::vector<phi::DenseTensor>& tensors) {
void CheckValidInputs(const std::vector<DenseTensor>& tensors) {
PADDLE_ENFORCE_EQ(
tensors.size() == 1,
true,
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/distributed/collective/mpi_tools.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ MPI_Op ToMPIType(ReduceOp reduction);

bool CheckMpiCudaAware();

void CheckValidInputs(const std::vector<phi::DenseTensor>& tensors);
void CheckValidInputs(const std::vector<DenseTensor>& tensors);

} // namespace mpi
} // namespace distributed
Expand Down
10 changes: 5 additions & 5 deletions paddle/fluid/distributed/collective/process_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ static void CheckTensorContiguous(const phi::DenseTensor& tensor) {
}
}

static void CheckTensorContiguous(const std::vector<phi::DenseTensor>& inputs) {
static void CheckTensorContiguous(const std::vector<DenseTensor>& inputs) {
for (const auto& tensor : inputs) {
if (!tensor.meta().is_contiguous()) {
PADDLE_THROW(
Expand All @@ -66,7 +66,7 @@ static void CheckTensorContiguous(const std::vector<phi::DenseTensor>& inputs) {
}
}

static void CheckTensorSamePlace(const std::vector<phi::DenseTensor>& tensors) {
static void CheckTensorSamePlace(const std::vector<DenseTensor>& tensors) {
for (const auto& tensor : tensors) {
if (tensor.place() != tensors[0].place()) {
PADDLE_THROW(
Expand All @@ -77,7 +77,7 @@ static void CheckTensorSamePlace(const std::vector<phi::DenseTensor>& tensors) {
}

static std::vector<int64_t> GetAllToAllSplitSizes(
const std::vector<phi::DenseTensor>& tensors) {
const std::vector<DenseTensor>& tensors) {
std::vector<int64_t> split_sizes(tensors.size());
std::transform(tensors.begin(),
tensors.end(),
Expand All @@ -87,7 +87,7 @@ static std::vector<int64_t> GetAllToAllSplitSizes(
}

static std::vector<const void*> GetTensorPtrs(
const std::vector<phi::DenseTensor>& tensors) {
const std::vector<DenseTensor>& tensors) {
std::vector<const void*> tensor_ptrs(tensors.size());
std::transform(tensors.begin(),
tensors.end(),
Expand All @@ -96,7 +96,7 @@ static std::vector<const void*> GetTensorPtrs(
return tensor_ptrs;
}

static int64_t GetTensorNumel(const std::vector<phi::DenseTensor>& tensors) {
static int64_t GetTensorNumel(const std::vector<DenseTensor>& tensors) {
return std::accumulate(tensors.begin(),
tensors.end(),
int64_t(0),
Expand Down
52 changes: 26 additions & 26 deletions paddle/fluid/distributed/collective/process_group_bkcl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupBKCL::Recv(
bool sync_op,
bool use_calc_stream) {
// numel > 0 indicates the tensor need to be sliced
phi::DenseTensor partial_tensor;
DenseTensor partial_tensor;
if (numel > 0) {
partial_tensor = GetPartialTensor(*tensor, offset, numel);
tensor = &partial_tensor;
Expand Down Expand Up @@ -246,7 +246,7 @@ void ProcessGroupBKCL::SyncCalcStream(const Place& place) {

std::shared_ptr<ProcessGroup::Task> ProcessGroupBKCL::Collective(
std::function<void(phi::distributed::BKCLCommContext*, XPUStream)> fn,
const std::vector<phi::DenseTensor>& tensors,
const std::vector<DenseTensor>& tensors,
CommType op_type,
bool sync_op,
bool use_calc_stream) {
Expand Down Expand Up @@ -309,7 +309,7 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupBKCL::Collective(
CommType op_type,
bool sync_op,
bool use_calc_stream) {
const std::vector<phi::DenseTensor> tensors = {tensor};
const std::vector<DenseTensor> tensors = {tensor};
return Collective(fn, tensors, op_type, sync_op, use_calc_stream);
}

Expand Down Expand Up @@ -505,15 +505,15 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupBKCL::AllToAll(
new paddle::experimental::DefaultAllocator(place));
phi::DenseTensorMeta meta(phi::DataType::INT64, phi::DDim{nranks});
#if defined(PADDLE_WITH_FLAGCX)
phi::DenseTensor in_size_tensor = {allocator_cpu.get(), meta};
phi::DenseTensor in_offset_tensor = {allocator_cpu.get(), meta};
phi::DenseTensor out_size_tensor = {allocator_cpu.get(), meta};
phi::DenseTensor out_offset_tensor = {allocator_cpu.get(), meta};
DenseTensor in_size_tensor = {allocator_cpu.get(), meta};
DenseTensor in_offset_tensor = {allocator_cpu.get(), meta};
DenseTensor out_size_tensor = {allocator_cpu.get(), meta};
DenseTensor out_offset_tensor = {allocator_cpu.get(), meta};
#else
phi::DenseTensor in_size_tensor = {allocator.get(), meta};
phi::DenseTensor in_offset_tensor = {allocator.get(), meta};
phi::DenseTensor out_size_tensor = {allocator.get(), meta};
phi::DenseTensor out_offset_tensor = {allocator.get(), meta};
DenseTensor in_size_tensor = {allocator.get(), meta};
DenseTensor in_offset_tensor = {allocator.get(), meta};
DenseTensor out_size_tensor = {allocator.get(), meta};
DenseTensor out_offset_tensor = {allocator.get(), meta};
#endif

#if defined(PADDLE_WITH_FLAGCX)
Expand Down Expand Up @@ -580,8 +580,8 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupBKCL::AllToAll(
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupBKCL::AllToAll(
std::vector<phi::DenseTensor>* out_tensors,
const std::vector<phi::DenseTensor>& in_tensors,
std::vector<DenseTensor>* out_tensors,
const std::vector<DenseTensor>& in_tensors,
bool sync_op,
bool use_calc_stream) {
CheckTensorContiguous(in_tensors);
Expand Down Expand Up @@ -691,20 +691,20 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupBKCL::AllToAll(
phi::DenseTensorMeta split_meta(phi::DataType::INT64,
phi::DDim{nranks});

phi::DenseTensor concated_in_tensor = {allocator.get(),
concated_in_tensor_meta};
phi::DenseTensor concated_out_tensor = {allocator.get(),
concated_out_tensor_meta};
DenseTensor concated_in_tensor = {allocator.get(),
concated_in_tensor_meta};
DenseTensor concated_out_tensor = {allocator.get(),
concated_out_tensor_meta};
#if defined(PADDLE_WITH_FLAGCX)
phi::DenseTensor in_size_tensor = {allocator_cpu.get(), split_meta};
phi::DenseTensor in_offset_tensor = {allocator_cpu.get(), split_meta};
phi::DenseTensor out_size_tensor = {allocator_cpu.get(), split_meta};
phi::DenseTensor out_offset_tensor = {allocator_cpu.get(), split_meta};
DenseTensor in_size_tensor = {allocator_cpu.get(), split_meta};
DenseTensor in_offset_tensor = {allocator_cpu.get(), split_meta};
DenseTensor out_size_tensor = {allocator_cpu.get(), split_meta};
DenseTensor out_offset_tensor = {allocator_cpu.get(), split_meta};
#else
phi::DenseTensor in_size_tensor = {allocator.get(), split_meta};
phi::DenseTensor in_offset_tensor = {allocator.get(), split_meta};
phi::DenseTensor out_size_tensor = {allocator.get(), split_meta};
phi::DenseTensor out_offset_tensor = {allocator.get(), split_meta};
DenseTensor in_size_tensor = {allocator.get(), split_meta};
DenseTensor in_offset_tensor = {allocator.get(), split_meta};
DenseTensor out_size_tensor = {allocator.get(), split_meta};
DenseTensor out_offset_tensor = {allocator.get(), split_meta};
#endif

if (in_numel_sum > 0) {
Expand Down Expand Up @@ -965,7 +965,7 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupBKCL::Barrier(
auto allocator = std::unique_ptr<phi::Allocator>(
new paddle::experimental::DefaultAllocator(place));
phi::DenseTensorMeta meta(phi::DataType::FLOAT32, phi::DDim{1});
phi::DenseTensor barrier_tensor{allocator.get(), meta};
DenseTensor barrier_tensor{allocator.get(), meta};

auto task = AllReduce(&barrier_tensor,
barrier_tensor,
Expand Down
6 changes: 3 additions & 3 deletions paddle/fluid/distributed/collective/process_group_bkcl.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ class ProcessGroupBKCL : public ProcessGroupWithStream {
bool use_calc_stream) override;

std::shared_ptr<ProcessGroup::Task> AllToAll(
std::vector<phi::DenseTensor>* out_tensors,
const std::vector<phi::DenseTensor>& in_tensors,
std::vector<DenseTensor>* out_tensors,
const std::vector<DenseTensor>& in_tensors,
bool sync_op,
bool use_calc_stream) override;

Expand Down Expand Up @@ -183,7 +183,7 @@ class ProcessGroupBKCL : public ProcessGroupWithStream {

std::shared_ptr<ProcessGroup::Task> Collective(
std::function<void(phi::distributed::BKCLCommContext*, XPUStream)> fn,
const std::vector<phi::DenseTensor>& tensors,
const std::vector<DenseTensor>& tensors,
CommType comm_type,
bool sync_op,
bool use_calc_stream);
Expand Down
Loading
Loading