diff --git a/paddle/fluid/distributed/collective/common.cc b/paddle/fluid/distributed/collective/common.cc index 197d24a106c5e3..16776577e63380 100644 --- a/paddle/fluid/distributed/collective/common.cc +++ b/paddle/fluid/distributed/collective/common.cc @@ -16,7 +16,7 @@ namespace paddle::distributed { -std::vector GetPlaceList(const std::vector& tensors) { +std::vector GetPlaceList(const std::vector& tensors) { std::vector places; places.reserve(tensors.size()); for (auto& tensor : tensors) { @@ -41,14 +41,14 @@ std::string GetKeyFromPlaces(const std::vector& places) { std::string GetKeyFromPlace(const Place& place) { return place.DebugString(); } -bool CheckTensorsInCudaPlace(const std::vector& tensors) { +bool CheckTensorsInCudaPlace(const std::vector& tensors) { return std::all_of( tensors.cbegin(), tensors.cend(), [&](const phi::DenseTensor& t) { return phi::is_gpu_place(t.place()); }); } -bool CheckTensorsInCustomPlace(const std::vector& tensors, +bool CheckTensorsInCustomPlace(const std::vector& tensors, const std::string& dev_type) { return std::all_of( tensors.cbegin(), tensors.cend(), [&](const phi::DenseTensor& t) { @@ -57,7 +57,7 @@ bool CheckTensorsInCustomPlace(const std::vector& tensors, }); } -bool CheckTensorsInXPUPlace(const std::vector& tensors) { +bool CheckTensorsInXPUPlace(const std::vector& tensors) { return std::all_of( tensors.cbegin(), tensors.cend(), [&](const phi::DenseTensor& t) { return phi::is_xpu_place(t.place()); diff --git a/paddle/fluid/distributed/collective/common.h b/paddle/fluid/distributed/collective/common.h index ee69697f646f79..cf7e0d67f8b8a2 100644 --- a/paddle/fluid/distributed/collective/common.h +++ b/paddle/fluid/distributed/collective/common.h @@ -21,18 +21,18 @@ namespace distributed { using Place = phi::Place; // Get the list of devices from list of tensors -std::vector GetPlaceList(const std::vector& tensors); +std::vector GetPlaceList(const std::vector& tensors); // Get the deviceList String from the list of devices std::string GetKeyFromPlaces(const std::vector& places); // Get the device string from one device std::string GetKeyFromPlace(const Place& place); -bool CheckTensorsInCudaPlace(const std::vector& tensors); +bool CheckTensorsInCudaPlace(const std::vector& tensors); -bool CheckTensorsInCustomPlace(const std::vector& tensors, +bool CheckTensorsInCustomPlace(const std::vector& tensors, const std::string& dev_type); -bool CheckTensorsInXPUPlace(const std::vector& tensors); +bool CheckTensorsInXPUPlace(const std::vector& tensors); } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/collective/mpi_tools.cc b/paddle/fluid/distributed/collective/mpi_tools.cc index 6521e872a1f96f..62d0d736a12520 100644 --- a/paddle/fluid/distributed/collective/mpi_tools.cc +++ b/paddle/fluid/distributed/collective/mpi_tools.cc @@ -39,7 +39,7 @@ MPI_Op ToMPIType(ReduceOp reduction) { // NOTE: MPI does not support CUDA aware now. bool CheckMpiCudaAware() { return false; } -void CheckValidInputs(const std::vector& tensors) { +void CheckValidInputs(const std::vector& tensors) { PADDLE_ENFORCE_EQ( tensors.size() == 1, true, diff --git a/paddle/fluid/distributed/collective/mpi_tools.h b/paddle/fluid/distributed/collective/mpi_tools.h index bac8e925797b38..6820ca25b81733 100644 --- a/paddle/fluid/distributed/collective/mpi_tools.h +++ b/paddle/fluid/distributed/collective/mpi_tools.h @@ -46,7 +46,7 @@ MPI_Op ToMPIType(ReduceOp reduction); bool CheckMpiCudaAware(); -void CheckValidInputs(const std::vector& tensors); +void CheckValidInputs(const std::vector& tensors); } // namespace mpi } // namespace distributed diff --git a/paddle/fluid/distributed/collective/process_group.h b/paddle/fluid/distributed/collective/process_group.h index 61e0a568cccfcc..72ca9b91d59746 100644 --- a/paddle/fluid/distributed/collective/process_group.h +++ b/paddle/fluid/distributed/collective/process_group.h @@ -57,7 +57,7 @@ static void CheckTensorContiguous(const phi::DenseTensor& tensor) { } } -static void CheckTensorContiguous(const std::vector& inputs) { +static void CheckTensorContiguous(const std::vector& inputs) { for (const auto& tensor : inputs) { if (!tensor.meta().is_contiguous()) { PADDLE_THROW( @@ -66,7 +66,7 @@ static void CheckTensorContiguous(const std::vector& inputs) { } } -static void CheckTensorSamePlace(const std::vector& tensors) { +static void CheckTensorSamePlace(const std::vector& tensors) { for (const auto& tensor : tensors) { if (tensor.place() != tensors[0].place()) { PADDLE_THROW( @@ -77,7 +77,7 @@ static void CheckTensorSamePlace(const std::vector& tensors) { } static std::vector GetAllToAllSplitSizes( - const std::vector& tensors) { + const std::vector& tensors) { std::vector split_sizes(tensors.size()); std::transform(tensors.begin(), tensors.end(), @@ -87,7 +87,7 @@ static std::vector GetAllToAllSplitSizes( } static std::vector GetTensorPtrs( - const std::vector& tensors) { + const std::vector& tensors) { std::vector tensor_ptrs(tensors.size()); std::transform(tensors.begin(), tensors.end(), @@ -96,7 +96,7 @@ static std::vector GetTensorPtrs( return tensor_ptrs; } -static int64_t GetTensorNumel(const std::vector& tensors) { +static int64_t GetTensorNumel(const std::vector& tensors) { return std::accumulate(tensors.begin(), tensors.end(), int64_t(0), diff --git a/paddle/fluid/distributed/collective/process_group_bkcl.cc b/paddle/fluid/distributed/collective/process_group_bkcl.cc index d799dee8a4144c..c80479e2aed93e 100644 --- a/paddle/fluid/distributed/collective/process_group_bkcl.cc +++ b/paddle/fluid/distributed/collective/process_group_bkcl.cc @@ -108,7 +108,7 @@ std::shared_ptr ProcessGroupBKCL::Recv( bool sync_op, bool use_calc_stream) { // numel > 0 indicates the tensor need to be sliced - phi::DenseTensor partial_tensor; + DenseTensor partial_tensor; if (numel > 0) { partial_tensor = GetPartialTensor(*tensor, offset, numel); tensor = &partial_tensor; @@ -246,7 +246,7 @@ void ProcessGroupBKCL::SyncCalcStream(const Place& place) { std::shared_ptr ProcessGroupBKCL::Collective( std::function fn, - const std::vector& tensors, + const std::vector& tensors, CommType op_type, bool sync_op, bool use_calc_stream) { @@ -309,7 +309,7 @@ std::shared_ptr ProcessGroupBKCL::Collective( CommType op_type, bool sync_op, bool use_calc_stream) { - const std::vector tensors = {tensor}; + const std::vector tensors = {tensor}; return Collective(fn, tensors, op_type, sync_op, use_calc_stream); } @@ -505,15 +505,15 @@ std::shared_ptr ProcessGroupBKCL::AllToAll( new paddle::experimental::DefaultAllocator(place)); phi::DenseTensorMeta meta(phi::DataType::INT64, phi::DDim{nranks}); #if defined(PADDLE_WITH_FLAGCX) - phi::DenseTensor in_size_tensor = {allocator_cpu.get(), meta}; - phi::DenseTensor in_offset_tensor = {allocator_cpu.get(), meta}; - phi::DenseTensor out_size_tensor = {allocator_cpu.get(), meta}; - phi::DenseTensor out_offset_tensor = {allocator_cpu.get(), meta}; + DenseTensor in_size_tensor = {allocator_cpu.get(), meta}; + DenseTensor in_offset_tensor = {allocator_cpu.get(), meta}; + DenseTensor out_size_tensor = {allocator_cpu.get(), meta}; + DenseTensor out_offset_tensor = {allocator_cpu.get(), meta}; #else - phi::DenseTensor in_size_tensor = {allocator.get(), meta}; - phi::DenseTensor in_offset_tensor = {allocator.get(), meta}; - phi::DenseTensor out_size_tensor = {allocator.get(), meta}; - phi::DenseTensor out_offset_tensor = {allocator.get(), meta}; + DenseTensor in_size_tensor = {allocator.get(), meta}; + DenseTensor in_offset_tensor = {allocator.get(), meta}; + DenseTensor out_size_tensor = {allocator.get(), meta}; + DenseTensor out_offset_tensor = {allocator.get(), meta}; #endif #if defined(PADDLE_WITH_FLAGCX) @@ -580,8 +580,8 @@ std::shared_ptr ProcessGroupBKCL::AllToAll( } std::shared_ptr ProcessGroupBKCL::AllToAll( - std::vector* out_tensors, - const std::vector& in_tensors, + std::vector* out_tensors, + const std::vector& in_tensors, bool sync_op, bool use_calc_stream) { CheckTensorContiguous(in_tensors); @@ -691,20 +691,20 @@ std::shared_ptr ProcessGroupBKCL::AllToAll( phi::DenseTensorMeta split_meta(phi::DataType::INT64, phi::DDim{nranks}); - phi::DenseTensor concated_in_tensor = {allocator.get(), - concated_in_tensor_meta}; - phi::DenseTensor concated_out_tensor = {allocator.get(), - concated_out_tensor_meta}; + DenseTensor concated_in_tensor = {allocator.get(), + concated_in_tensor_meta}; + DenseTensor concated_out_tensor = {allocator.get(), + concated_out_tensor_meta}; #if defined(PADDLE_WITH_FLAGCX) - phi::DenseTensor in_size_tensor = {allocator_cpu.get(), split_meta}; - phi::DenseTensor in_offset_tensor = {allocator_cpu.get(), split_meta}; - phi::DenseTensor out_size_tensor = {allocator_cpu.get(), split_meta}; - phi::DenseTensor out_offset_tensor = {allocator_cpu.get(), split_meta}; + DenseTensor in_size_tensor = {allocator_cpu.get(), split_meta}; + DenseTensor in_offset_tensor = {allocator_cpu.get(), split_meta}; + DenseTensor out_size_tensor = {allocator_cpu.get(), split_meta}; + DenseTensor out_offset_tensor = {allocator_cpu.get(), split_meta}; #else - phi::DenseTensor in_size_tensor = {allocator.get(), split_meta}; - phi::DenseTensor in_offset_tensor = {allocator.get(), split_meta}; - phi::DenseTensor out_size_tensor = {allocator.get(), split_meta}; - phi::DenseTensor out_offset_tensor = {allocator.get(), split_meta}; + DenseTensor in_size_tensor = {allocator.get(), split_meta}; + DenseTensor in_offset_tensor = {allocator.get(), split_meta}; + DenseTensor out_size_tensor = {allocator.get(), split_meta}; + DenseTensor out_offset_tensor = {allocator.get(), split_meta}; #endif if (in_numel_sum > 0) { @@ -965,7 +965,7 @@ std::shared_ptr ProcessGroupBKCL::Barrier( auto allocator = std::unique_ptr( new paddle::experimental::DefaultAllocator(place)); phi::DenseTensorMeta meta(phi::DataType::FLOAT32, phi::DDim{1}); - phi::DenseTensor barrier_tensor{allocator.get(), meta}; + DenseTensor barrier_tensor{allocator.get(), meta}; auto task = AllReduce(&barrier_tensor, barrier_tensor, diff --git a/paddle/fluid/distributed/collective/process_group_bkcl.h b/paddle/fluid/distributed/collective/process_group_bkcl.h index e46229ea453572..54f96d4b8b76ed 100644 --- a/paddle/fluid/distributed/collective/process_group_bkcl.h +++ b/paddle/fluid/distributed/collective/process_group_bkcl.h @@ -112,8 +112,8 @@ class ProcessGroupBKCL : public ProcessGroupWithStream { bool use_calc_stream) override; std::shared_ptr AllToAll( - std::vector* out_tensors, - const std::vector& in_tensors, + std::vector* out_tensors, + const std::vector& in_tensors, bool sync_op, bool use_calc_stream) override; @@ -183,7 +183,7 @@ class ProcessGroupBKCL : public ProcessGroupWithStream { std::shared_ptr Collective( std::function fn, - const std::vector& tensors, + const std::vector& tensors, CommType comm_type, bool sync_op, bool use_calc_stream); diff --git a/paddle/fluid/distributed/collective/process_group_custom.cc b/paddle/fluid/distributed/collective/process_group_custom.cc index 0f5ecb2f262e05..4a40d31463b71a 100644 --- a/paddle/fluid/distributed/collective/process_group_custom.cc +++ b/paddle/fluid/distributed/collective/process_group_custom.cc @@ -51,11 +51,10 @@ ProcessGroupCustom::XCCLTask::XCCLTask(const Place& place, comm_event_->Init(task_place_); } -ProcessGroupCustom::XCCLTask::XCCLTask( - const std::vector& places, - int rank, - CommType CommType, - const std::vector& inputs) +ProcessGroupCustom::XCCLTask::XCCLTask(const std::vector& places, + int rank, + CommType CommType, + const std::vector& inputs) : TaskStream(rank, inputs, CommType), task_place_(places[0]), comm_event_(std::make_unique()) { @@ -290,7 +289,7 @@ std::shared_ptr ProcessGroupCustom::AllToAll( int64_t out_row_size = out_dim[0] == 0 ? 0 : out_tensor->numel() / out_dim[0]; int64_t in_offset = 0, in_numel = 0, out_offset = 0, out_numel = 0; - phi::DenseTensor input_partial, output_partial; + DenseTensor input_partial, output_partial; VLOG(3) << "[AllToAll] " << "sendbuff: " << in_tensor.data() @@ -355,7 +354,7 @@ std::shared_ptr ProcessGroupCustom::Barrier( auto allocator = std::unique_ptr( new paddle::experimental::DefaultAllocator(place)); phi::DenseTensorMeta meta(phi::DataType::FLOAT32, phi::DDim{1}); - phi::DenseTensor barrier_tensor{allocator.get(), meta}; + DenseTensor barrier_tensor{allocator.get(), meta}; auto task = AllReduce(&barrier_tensor, barrier_tensor, @@ -460,7 +459,7 @@ std::shared_ptr ProcessGroupCustom::Scatter( int64_t numel = in_tensor.numel() / size_; if (rank_ == opts.root_rank) { int64_t offset = 0; - phi::DenseTensor partial_tensor; + DenseTensor partial_tensor; for (auto i = 0; i < size_; i++) { partial_tensor = GetPartialTensor(in_tensor, offset, numel); if (i != rank_) { @@ -494,7 +493,7 @@ std::shared_ptr ProcessGroupCustom::Gather( CheckTensorContiguous(in_tensor); CheckTensorContiguous(*out_tensor); - std::vector partial_tensors; + std::vector partial_tensors; if (rank_ == opts.root_rank) { partial_tensors.reserve(size_); size_t offset = 0; @@ -508,7 +507,7 @@ std::shared_ptr ProcessGroupCustom::Gather( } std::shared_ptr ProcessGroupCustom::Gather( - std::vector* gather_tensors_ptr, + std::vector* gather_tensors_ptr, const phi::DenseTensor& in_tensor, const GatherOptions& opts, bool sync_op, @@ -559,7 +558,7 @@ std::shared_ptr ProcessGroupCustom::Recv( bool sync_op, bool use_calc_stream) { // numel > 0 indicates the tensor need to be sliced - phi::DenseTensor partial_tensor; + DenseTensor partial_tensor; if (numel > 0) { partial_tensor = GetPartialTensor(*tensor, offset, numel); tensor = &partial_tensor; @@ -675,7 +674,7 @@ void ProcessGroupCustom::SyncCalcStream(const Place& place) { std::shared_ptr ProcessGroupCustom::RunFnInXCCLEnv( std::function fn, - const std::vector& tensors, + const std::vector& tensors, CommType comm_type, bool sync_op, bool use_calc_stream) { @@ -722,7 +721,7 @@ std::shared_ptr ProcessGroupCustom::RunFnInXCCLEnv( CommType comm_type, bool sync_op, bool use_calc_stream) { - const std::vector tensors = {tensor}; + const std::vector tensors = {tensor}; return RunFnInXCCLEnv(fn, tensors, comm_type, sync_op, use_calc_stream); } @@ -742,7 +741,7 @@ std::shared_ptr ProcessGroupCustom::CreateTask( std::vector places, int rank, CommType comm_type, - const std::vector& inputs) { + const std::vector& inputs) { return std::make_shared( places, rank, comm_type, inputs); } @@ -814,8 +813,8 @@ void ProcessGroupCustom::CreateXCCLManagerCache( template std::shared_ptr ProcessGroupCustom::Collective( - std::vector& inputs, - std::vector& outputs, + std::vector& inputs, + std::vector& outputs, bool use_calc_stream, Fn fn, CommType op_type) { @@ -870,10 +869,7 @@ std::shared_ptr ProcessGroupCustom::Collective( template std::shared_ptr ProcessGroupCustom::PointToPoint( - std::vector& tensors, - Fn fn, - int dst_rank, - CommType op_type) { + std::vector& tensors, Fn fn, int dst_rank, CommType op_type) { CheckTensorContiguous(tensors); const auto places = GetPlaceList(tensors); @@ -923,8 +919,8 @@ std::shared_ptr ProcessGroupCustom::PointToPoint( } std::shared_ptr ProcessGroupCustom::AllReduce( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, const AllreduceOptions& opts, bool use_calc_stream, bool sync_op UNUSED) { @@ -954,8 +950,8 @@ std::shared_ptr ProcessGroupCustom::AllReduce( } std::shared_ptr ProcessGroupCustom::Broadcast( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, const BroadcastOptions& opts) { CheckTensorContiguous(in_tensors); CheckTensorContiguous(out_tensors); @@ -982,7 +978,7 @@ std::shared_ptr ProcessGroupCustom::Broadcast( } inline void CheckTensorsInDifferentDevices( - const std::vector& tensors, const size_t num_devices) { + const std::vector& tensors, const size_t num_devices) { PADDLE_ENFORCE_EQ( tensors.empty(), false, @@ -1010,7 +1006,7 @@ inline void CheckTensorsInDifferentDevices( } std::shared_ptr ProcessGroupCustom::Send( - std::vector& tensors, int dst_rank) { + std::vector& tensors, int dst_rank) { CheckTensorContiguous(tensors); CheckTensorsInDifferentDevices(tensors, static_cast(GetSize())); @@ -1030,7 +1026,7 @@ std::shared_ptr ProcessGroupCustom::Send( } std::shared_ptr ProcessGroupCustom::Recv( - std::vector& tensors, int src_rank) { + std::vector& tensors, int src_rank) { CheckTensorContiguous(tensors); CheckTensorsInDifferentDevices(tensors, static_cast(GetSize())); @@ -1051,8 +1047,8 @@ std::shared_ptr ProcessGroupCustom::Recv( } std::shared_ptr ProcessGroupCustom::AllGather( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, bool use_calc_stream, bool sync_op UNUSED) { CheckTensorContiguous(in_tensors); @@ -1081,8 +1077,8 @@ std::shared_ptr ProcessGroupCustom::AllGather( } std::shared_ptr ProcessGroupCustom::AllToAll( - std::vector& in_tensors, - std::vector& out_tensors) { + std::vector& in_tensors, + std::vector& out_tensors) { CheckTensorContiguous(in_tensors); CheckTensorContiguous(out_tensors); @@ -1134,8 +1130,8 @@ std::shared_ptr ProcessGroupCustom::AllToAll( } std::shared_ptr ProcessGroupCustom::AllToAll( - std::vector* out_tensors, - const std::vector& in_tensors, + std::vector* out_tensors, + const std::vector& in_tensors, bool sync_op, bool use_calc_stream) { CheckTensorContiguous(in_tensors); @@ -1214,8 +1210,8 @@ std::shared_ptr ProcessGroupCustom::AllToAll( } std::shared_ptr ProcessGroupCustom::Reduce( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, const ReduceOptions& opts) { CheckTensorContiguous(in_tensors); CheckTensorContiguous(out_tensors); @@ -1243,8 +1239,8 @@ std::shared_ptr ProcessGroupCustom::Reduce( } std::shared_ptr ProcessGroupCustom::Scatter( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, const ScatterOptions& opts) { CheckTensorContiguous(in_tensors); CheckTensorContiguous(out_tensors); diff --git a/paddle/fluid/distributed/collective/process_group_custom.h b/paddle/fluid/distributed/collective/process_group_custom.h index 456d646e3da574..c5cf5ca8ea803e 100644 --- a/paddle/fluid/distributed/collective/process_group_custom.h +++ b/paddle/fluid/distributed/collective/process_group_custom.h @@ -58,7 +58,7 @@ class ProcessGroupCustom final : public ProcessGroupWithStream { XCCLTask(const std::vector& places, int rank, CommType CommType, - const std::vector& inputs); + const std::vector& inputs); private: bool block_cpu_in_wait_{false}; @@ -113,8 +113,8 @@ class ProcessGroupCustom final : public ProcessGroupWithStream { bool use_calc_stream) override; std::shared_ptr AllToAll( - std::vector* out_tensors, - const std::vector& in_tensors, + std::vector* out_tensors, + const std::vector& in_tensors, bool sync_op, bool use_calc_stream) override; @@ -154,7 +154,7 @@ class ProcessGroupCustom final : public ProcessGroupWithStream { bool use_calc_stream) override; std::shared_ptr Gather( - std::vector* gather_tensors_ptr, + std::vector* gather_tensors_ptr, const phi::DenseTensor& in_tensor, const GatherOptions& opts, bool sync_op, @@ -185,42 +185,42 @@ class ProcessGroupCustom final : public ProcessGroupWithStream { phi::distributed::XCCLCommContext* GetOrCreateCommContext(const Place& place); std::shared_ptr AllReduce( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, const AllreduceOptions& = AllreduceOptions(), bool use_calc_stream = false, bool sync_op = false) override; // TODO(sunyilun): methods below will be removed later std::shared_ptr Broadcast( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, const BroadcastOptions& = BroadcastOptions()) override; - std::shared_ptr Send( - std::vector& tensors, int dst_rank) override; + std::shared_ptr Send(std::vector& tensors, + int dst_rank) override; - std::shared_ptr Recv( - std::vector& tensors, int src_rank) override; + std::shared_ptr Recv(std::vector& tensors, + int src_rank) override; std::shared_ptr AllGather( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, bool use_calc_stream = false, bool sync_op = false) override; std::shared_ptr AllToAll( - std::vector& in_tensors, - std::vector& out_tensors) override; + std::vector& in_tensors, + std::vector& out_tensors) override; std::shared_ptr Reduce( - std::vector& tensors, - std::vector& out_tensors, + std::vector& tensors, + std::vector& out_tensors, const ReduceOptions& opts) override; std::shared_ptr Scatter( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, const ScatterOptions& opts) override; private: @@ -239,7 +239,7 @@ class ProcessGroupCustom final : public ProcessGroupWithStream { std::shared_ptr RunFnInXCCLEnv( std::function fn, - const std::vector& tensors, + const std::vector& tensors, CommType comm_type, bool sync_op, bool use_calc_stream); @@ -256,19 +256,19 @@ class ProcessGroupCustom final : public ProcessGroupWithStream { std::vector places, int rank, CommType op_type, - const std::vector& inputs); + const std::vector& inputs); template std::shared_ptr Collective( - std::vector& inputs, // NOLINT - std::vector& outputs, // NOLINT + std::vector& inputs, // NOLINT + std::vector& outputs, // NOLINT bool use_calc_stream, Fn fn, CommType op_type); template std::shared_ptr PointToPoint( - std::vector& tensors, // NOLINT + std::vector& tensors, // NOLINT Fn fn, int dst_rank, CommType op_type); diff --git a/paddle/fluid/distributed/collective/process_group_flagcx.cc b/paddle/fluid/distributed/collective/process_group_flagcx.cc index 87f3a14d0116b7..b15c57502c8c1d 100644 --- a/paddle/fluid/distributed/collective/process_group_flagcx.cc +++ b/paddle/fluid/distributed/collective/process_group_flagcx.cc @@ -304,7 +304,7 @@ std::shared_ptr ProcessGroupFlagcx::AllToAll( int64_t out_row_size = out_dim[0] == 0 ? 0 : out_tensor->numel() / out_dim[0]; int64_t in_offset = 0, in_numel = 0, out_offset = 0, out_numel = 0; - phi::DenseTensor input_partial, output_partial; + DenseTensor input_partial, output_partial; VLOG(3) << "[AllToAll] " << "sendbuff: " << in_tensor.data() @@ -352,8 +352,8 @@ std::shared_ptr ProcessGroupFlagcx::AllToAll( } std::shared_ptr ProcessGroupFlagcx::AllToAll( - std::vector* out_tensors, - const std::vector& in_tensors, + std::vector* out_tensors, + const std::vector& in_tensors, bool sync_op, bool use_calc_stream) { CheckTensorContiguous(in_tensors); @@ -431,7 +431,7 @@ std::shared_ptr ProcessGroupFlagcx::Barrier( auto allocator = std::unique_ptr( new paddle::experimental::DefaultAllocator(place)); phi::DenseTensorMeta meta(phi::DataType::FLOAT32, phi::DDim{1}); - phi::DenseTensor barrier_tensor{allocator.get(), meta}; + DenseTensor barrier_tensor{allocator.get(), meta}; VLOG(3) << "[Barrier] " << "barrier opt: " << opts.device_id; @@ -582,7 +582,7 @@ std::shared_ptr ProcessGroupFlagcx::Scatter( int64_t numel = in_tensor.numel() / size_; if (rank_ == opts.root_rank) { int64_t offset = 0; - phi::DenseTensor partial_tensor; + DenseTensor partial_tensor; comm_context->GroupStart(); for (auto i = 0; i < size_; i++) { partial_tensor = GetPartialTensor(in_tensor, offset, numel); @@ -610,7 +610,7 @@ std::shared_ptr ProcessGroupFlagcx::Gather( CheckTensorContiguous(in_tensor); CheckTensorContiguous(*out_tensor); - std::vector partial_tensors; + std::vector partial_tensors; if (rank_ == opts.root_rank) { partial_tensors.reserve(size_); size_t offset = 0; @@ -626,7 +626,7 @@ std::shared_ptr ProcessGroupFlagcx::Gather( } std::shared_ptr ProcessGroupFlagcx::Gather( - std::vector* gather_tensors_ptr, + std::vector* gather_tensors_ptr, const phi::DenseTensor& in_tensor, const GatherOptions& opts, bool sync_op, @@ -679,7 +679,7 @@ std::shared_ptr ProcessGroupFlagcx::Recv( bool use_calc_stream) { CheckTensorContiguous(*tensor); // numel > 0 indicates the tensor need to be sliced - phi::DenseTensor partial_tensor; + DenseTensor partial_tensor; if (numel > 0) { partial_tensor = GetPartialTensor(*tensor, offset, numel); tensor = &partial_tensor; @@ -870,7 +870,7 @@ void ProcessGroupFlagcx::EagerConnectRingExchange() { std::shared_ptr ProcessGroupFlagcx::Collective( std::function fn, - const std::vector& tensors, + const std::vector& tensors, CommType comm_type, bool sync_op, bool use_calc_stream) { @@ -931,7 +931,7 @@ std::shared_ptr ProcessGroupFlagcx::Collective( } else { for (size_t i = 0; i < tensors.size(); ++i) { coalescing_tensors_.emplace_back( - std::make_shared(tensors[i])); + std::make_shared(tensors[i])); } coalescing_place_keys_.push_back(key); } @@ -951,7 +951,7 @@ std::shared_ptr ProcessGroupFlagcx::Collective( CommType comm_type, bool sync_op, bool use_calc_stream) { - const std::vector tensors = {tensor}; + const std::vector tensors = {tensor}; return Collective(fn, tensors, comm_type, sync_op, use_calc_stream); } @@ -1020,8 +1020,7 @@ std::shared_ptr ProcessGroupFlagcx::Point2Point( allocation_stream_pairs_.emplace_back( tensor.Holder(), *reinterpret_cast(flagcx_stream)); } else { - coalescing_tensors_.emplace_back( - std::make_shared(tensor)); + coalescing_tensors_.emplace_back(std::make_shared(tensor)); coalescing_place_keys_.push_back(key); } } diff --git a/paddle/fluid/distributed/collective/process_group_flagcx.h b/paddle/fluid/distributed/collective/process_group_flagcx.h index e03ad43ebb22b8..27a8411a547f6f 100644 --- a/paddle/fluid/distributed/collective/process_group_flagcx.h +++ b/paddle/fluid/distributed/collective/process_group_flagcx.h @@ -59,7 +59,7 @@ class ProcessGroupFlagcx final : public ProcessGroupWithStream { FlagcxTask(const std::vector& places, int rank, CommType CommType, - const std::vector& inputs); + const std::vector& inputs); void RemoveHolderStreamInGroup(); @@ -118,8 +118,8 @@ class ProcessGroupFlagcx final : public ProcessGroupWithStream { bool use_calc_stream) override; std::shared_ptr AllToAll( - std::vector* out_tensors, - const std::vector& in_tensors, + std::vector* out_tensors, + const std::vector& in_tensors, bool sync_op, bool use_calc_stream) override; @@ -159,7 +159,7 @@ class ProcessGroupFlagcx final : public ProcessGroupWithStream { bool use_calc_stream) override; std::shared_ptr Gather( - std::vector* gather_tensors_ptr, + std::vector* gather_tensors_ptr, const phi::DenseTensor& in_tensor, const GatherOptions& opts, bool sync_op, @@ -216,7 +216,7 @@ class ProcessGroupFlagcx final : public ProcessGroupWithStream { std::shared_ptr Collective( std::function fn, - const std::vector& tensors, + const std::vector& tensors, CommType comm_type, bool sync_op, bool use_calc_stream); @@ -285,7 +285,7 @@ class ProcessGroupFlagcx final : public ProcessGroupWithStream { // For coalescing tensors processing (eg. batch_isend_irecv) bool is_coalescing_{false}; - std::vector> coalescing_tensors_; + std::vector> coalescing_tensors_; std::vector coalescing_place_keys_; }; diff --git a/paddle/fluid/distributed/collective/process_group_gloo.cc b/paddle/fluid/distributed/collective/process_group_gloo.cc index 9e60c6ed4d9c05..b1617710ae3d14 100644 --- a/paddle/fluid/distributed/collective/process_group_gloo.cc +++ b/paddle/fluid/distributed/collective/process_group_gloo.cc @@ -101,8 +101,7 @@ T* get_data(phi::DenseTensor& tensor) { // NOLINT } template -std::vector get_multi_data( - std::vector& tensors) { // NOLINT +std::vector get_multi_data(std::vector& tensors) { // NOLINT std::vector ret; ret.reserve(tensors.size()); for (auto& tensor : tensors) { @@ -122,14 +121,14 @@ void set_input(P& opts, phi::DenseTensor& tensor) { // NOLINT } template -void set_outputs(P& opts, // NOLINT - std::vector& tensors) { // NOLINT +void set_outputs(P& opts, // NOLINT + std::vector& tensors) { // NOLINT opts.setOutputs(get_multi_data(tensors), tensors[0].numel()); } template -void set_inputs(P& opts, // NOLINT - std::vector& tensors) { // NOLINT +void set_inputs(P& opts, // NOLINT + std::vector& tensors) { // NOLINT opts.setInputs(get_multi_data(tensors), tensors[0].numel()); } @@ -148,8 +147,9 @@ void set_inputs_for_scatter(P& opts, // NOLINT opts.setInputs(ret, tensor.numel() / nranks); } -ProcessGroupGloo::GlooTask::GlooTask( - int rank, const std::vector& inputs, CommType comm_type) +ProcessGroupGloo::GlooTask::GlooTask(int rank, + const std::vector& inputs, + CommType comm_type) : ProcessGroup::Task(rank, inputs, comm_type) {} ProcessGroupGloo::ProcessGroupGloo( @@ -168,8 +168,8 @@ ProcessGroupGloo::ProcessGroupGloo( class BroadcastGlooTask : public ProcessGroupGloo::GlooTask { public: BroadcastGlooTask(phi::distributed::GlooCommContext* comm_context, - std::vector& inputs, // NOLINT - std::vector& outputs, // NOLINT + std::vector& inputs, // NOLINT + std::vector& outputs, // NOLINT int rank, int root, uint32_t tag) @@ -185,8 +185,8 @@ class BroadcastGlooTask : public ProcessGroupGloo::GlooTask { private: phi::distributed::GlooCommContext* _comm_context; const int _root; - std::vector _inputs{}; - std::vector _outputs{}; + std::vector _inputs{}; + std::vector _outputs{}; const uint32_t _tag; void _do_broadcast(phi::DenseTensor& in, phi::DenseTensor& out) { // NOLINT @@ -200,21 +200,21 @@ std::shared_ptr ProcessGroupGloo::Broadcast( const phi::DenseTensor& in_tensor, const BroadcastOptions& opts, bool sync_op) { - std::vector in_wrapper{in_tensor}; - std::vector out_wrapper{*out_tensor}; + std::vector in_wrapper{in_tensor}; + std::vector out_wrapper{*out_tensor}; return Broadcast(in_wrapper, out_wrapper, opts, true); } std::shared_ptr ProcessGroupGloo::Broadcast( - std::vector& inputs, - std::vector& outputs, + std::vector& inputs, + std::vector& outputs, const BroadcastOptions& opts) { return Broadcast(inputs, outputs, opts, true); } std::shared_ptr ProcessGroupGloo::Broadcast( - std::vector& inputs, - std::vector& outputs, + std::vector& inputs, + std::vector& outputs, const BroadcastOptions& opts, bool sync_op) { CheckTensorContiguous(inputs); @@ -233,7 +233,7 @@ std::shared_ptr ProcessGroupGloo::Broadcast( class SendGlooTask : public ProcessGroupGloo::GlooTask { public: SendGlooTask(phi::distributed::GlooCommContext* comm_context, - std::vector* inputs, + std::vector* inputs, int rank, int dst_rank, uint32_t tag) @@ -247,23 +247,23 @@ class SendGlooTask : public ProcessGroupGloo::GlooTask { private: phi::distributed::GlooCommContext* _comm_context; - std::vector _inputs; + std::vector _inputs; int _dst; uint32_t _tag; - void _do_send(std::vector& in) { // NOLINT + void _do_send(std::vector& in) { // NOLINT _comm_context->Send(in[0], _dst, _tag); } }; std::shared_ptr ProcessGroupGloo::Send( const phi::DenseTensor& tensor, int dst_rank, bool sync_op) { - std::vector in_wrapper{tensor}; + std::vector in_wrapper{tensor}; return Send(in_wrapper, dst_rank); } std::shared_ptr ProcessGroupGloo::Send( - std::vector& inputs, int dst_rank) { + std::vector& inputs, int dst_rank) { CheckTensorContiguous(inputs); std::unique_ptr task; auto tag = next_tag(); @@ -278,7 +278,7 @@ std::shared_ptr ProcessGroupGloo::Send( class RecvGlooTask : public ProcessGroupGloo::GlooTask { public: RecvGlooTask(phi::distributed::GlooCommContext* comm_context, - std::vector* outputs, + std::vector* outputs, int rank, int src_rank, uint32_t tag) @@ -292,23 +292,23 @@ class RecvGlooTask : public ProcessGroupGloo::GlooTask { private: phi::distributed::GlooCommContext* _comm_context; - std::vector _outputs; + std::vector _outputs; const int _src; const uint32_t _tag; - void _do_recv(std::vector& out) { // NOLINT + void _do_recv(std::vector& out) { // NOLINT _comm_context->Recv(&(out[0]), _src, _tag); } }; std::shared_ptr ProcessGroupGloo::Recv( phi::DenseTensor* tensor, int src_rank, bool sync_op) { - std::vector in_wrapper{*tensor}; + std::vector in_wrapper{*tensor}; return Recv(in_wrapper, src_rank); } std::shared_ptr ProcessGroupGloo::Recv( - std::vector& outputs, int src_rank) { + std::vector& outputs, int src_rank) { std::unique_ptr task; auto tag = next_tag(); auto comm_context = this->GetCommContext(); @@ -323,8 +323,8 @@ class AllreduceGlooTask : public ProcessGroupGloo::GlooTask { public: AllreduceGlooTask(int rank, phi::distributed::GlooCommContext* comm_context, - std::vector& inputs, // NOLINT - std::vector& outputs, // NOLINT + std::vector& inputs, // NOLINT + std::vector& outputs, // NOLINT ReduceOp reduce_op, uint32_t tag) : ProcessGroupGloo::GlooTask(rank, inputs, CommType::ALLREDUCE), @@ -338,13 +338,13 @@ class AllreduceGlooTask : public ProcessGroupGloo::GlooTask { private: phi::distributed::GlooCommContext* _comm_context; - std::vector _inputs; - std::vector _outputs; + std::vector _inputs; + std::vector _outputs; const ReduceOp _reduce_op; uint32_t _tag; - void _do_allreduce(std::vector& ins, // NOLINT - std::vector& outs) { // NOLINT + void _do_allreduce(std::vector& ins, // NOLINT + std::vector& outs) { // NOLINT _comm_context->AllReduce( &(outs[0]), ins[0], static_cast(_reduce_op), _tag); } @@ -355,21 +355,21 @@ std::shared_ptr ProcessGroupGloo::AllReduce( const phi::DenseTensor& in_tensor, const AllreduceOptions& opts, bool sync_op) { - std::vector in_wrapper{in_tensor}; - std::vector out_wrapper{*out_tensor}; + std::vector in_wrapper{in_tensor}; + std::vector out_wrapper{*out_tensor}; return AllReduce(in_wrapper, out_wrapper, opts, true); } std::shared_ptr ProcessGroupGloo::AllReduce( - std::vector& inputs, - std::vector& outputs, + std::vector& inputs, + std::vector& outputs, const AllreduceOptions& opts) { return AllReduce(inputs, outputs, opts, true); } std::shared_ptr ProcessGroupGloo::AllReduce( - std::vector& inputs, - std::vector& outputs, + std::vector& inputs, + std::vector& outputs, const AllreduceOptions& opts, bool sync_op) { CheckTensorContiguous(inputs); @@ -388,7 +388,7 @@ class BarrierGlooTask : public ProcessGroupGloo::GlooTask { public: BarrierGlooTask(int rank, phi::distributed::GlooCommContext* comm_context) : ProcessGroupGloo::GlooTask( - rank, std::vector{}, CommType::BARRIER), + rank, std::vector{}, CommType::BARRIER), _comm_context(comm_context) {} void Run() override { _do_barrier(); } @@ -412,8 +412,8 @@ class AllgatherGlooTask : public ProcessGroupGloo::GlooTask { public: AllgatherGlooTask(int rank, phi::distributed::GlooCommContext* comm_context, - std::vector& inputs, // NOLINT - std::vector& outputs, // NOLINT + std::vector& inputs, // NOLINT + std::vector& outputs, // NOLINT uint32_t tag) : ProcessGroupGloo::GlooTask(rank, inputs, CommType::ALLGATHER), _comm_context(comm_context), @@ -425,12 +425,12 @@ class AllgatherGlooTask : public ProcessGroupGloo::GlooTask { private: phi::distributed::GlooCommContext* _comm_context; - std::vector _inputs; - std::vector _outputs; + std::vector _inputs; + std::vector _outputs; uint32_t _tag; - void _do_allgather(std::vector& in, // NOLINT - std::vector& out) { // NOLINT + void _do_allgather(std::vector& in, // NOLINT + std::vector& out) { // NOLINT _comm_context->AllGather(&(out[0]), in[0], _tag); } }; @@ -441,20 +441,20 @@ std::shared_ptr ProcessGroupGloo::AllGather( int64_t /*offset*/, int64_t /*offset*/, bool sync_op) { - std::vector in_wrapper{in_tensor}; - std::vector out_wrapper{*out_tensor}; + std::vector in_wrapper{in_tensor}; + std::vector out_wrapper{*out_tensor}; return AllGather(in_wrapper, out_wrapper, true); } std::shared_ptr ProcessGroupGloo::AllGather( - std::vector& in_tensors, - std::vector& out_tensors) { + std::vector& in_tensors, + std::vector& out_tensors) { return AllGather(in_tensors, out_tensors, true); } std::shared_ptr ProcessGroupGloo::AllGather( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, bool sync_op) { CheckTensorContiguous(in_tensors); CheckTensorContiguous(out_tensors); @@ -471,8 +471,8 @@ class ReduceGlooTask : public ProcessGroupGloo::GlooTask { public: ReduceGlooTask(int rank, phi::distributed::GlooCommContext* comm_context, - std::vector& inputs, // NOLINT - std::vector& outputs, // NOLINT + std::vector& inputs, // NOLINT + std::vector& outputs, // NOLINT ReduceOp reduce_op, int dst, uint32_t tag) @@ -488,14 +488,14 @@ class ReduceGlooTask : public ProcessGroupGloo::GlooTask { private: phi::distributed::GlooCommContext* _comm_context; - std::vector _inputs; - std::vector _outputs; + std::vector _inputs; + std::vector _outputs; const ReduceOp _reduce_op; int _dst; uint32_t _tag; - void _do_reduce(std::vector& inputs, // NOLINT - std::vector& outputs, // NOLINT + void _do_reduce(std::vector& inputs, // NOLINT + std::vector& outputs, // NOLINT int dst) { _comm_context->Reduce( &(outputs[0]), inputs[0], static_cast(_reduce_op), _dst, _tag); @@ -514,8 +514,8 @@ std::shared_ptr ProcessGroupGloo::Reduce( std::shared_ptr task; auto tag = next_tag(); auto comm_context = this->GetCommContext(); - std::vector in_wrapper{in_tensor}; - std::vector out_wrapper{*out_tensor}; + std::vector in_wrapper{in_tensor}; + std::vector out_wrapper{*out_tensor}; task = std::make_shared(rank_, comm_context, in_wrapper, @@ -528,8 +528,8 @@ std::shared_ptr ProcessGroupGloo::Reduce( } std::shared_ptr ProcessGroupGloo::Reduce( - std::vector& inputs, - std::vector& outputs, + std::vector& inputs, + std::vector& outputs, const ReduceOptions& opts) { return Reduce(&outputs[0], inputs[0], opts, true); } @@ -538,8 +538,8 @@ class ScatterGlooTask : public ProcessGroupGloo::GlooTask { public: ScatterGlooTask(int rank, phi::distributed::GlooCommContext* comm_context, - std::vector& inputs, // NOLINT - std::vector& outputs, // NOLINT + std::vector& inputs, // NOLINT + std::vector& outputs, // NOLINT int src, int size, uint32_t tag) @@ -555,14 +555,14 @@ class ScatterGlooTask : public ProcessGroupGloo::GlooTask { private: phi::distributed::GlooCommContext* _comm_context; - std::vector _inputs; - std::vector _outputs; + std::vector _inputs; + std::vector _outputs; int _src; int _size; uint32_t _tag; - void _do_scatter(std::vector& in, // NOLINT - std::vector& out, // NOLINT + void _do_scatter(std::vector& in, // NOLINT + std::vector& out, // NOLINT int src) { _comm_context->Scatter(&(out[0]), in[0], _src, _size, _tag); } @@ -579,8 +579,8 @@ std::shared_ptr ProcessGroupGloo::Scatter( auto tag = next_tag(); auto comm_context = this->GetCommContext(); - std::vector in_wrapper{in_tensor}; - std::vector out_wrapper{*out_tensor}; + std::vector in_wrapper{in_tensor}; + std::vector out_wrapper{*out_tensor}; task = std::make_shared( rank_, comm_context, in_wrapper, out_wrapper, opts.root_rank, size_, tag); task->Run(); @@ -588,8 +588,8 @@ std::shared_ptr ProcessGroupGloo::Scatter( } std::shared_ptr ProcessGroupGloo::Scatter( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, const ScatterOptions& opts) { return Scatter(&out_tensors[0], in_tensors[0], opts, true); } @@ -613,8 +613,8 @@ class GatherGlooTask : public ProcessGroupGloo::GlooTask { private: phi::distributed::GlooCommContext* _comm_context; - phi::DenseTensor _input; - phi::DenseTensor _output; + DenseTensor _input; + DenseTensor _output; int _src; uint32_t _tag; diff --git a/paddle/fluid/distributed/collective/process_group_gloo.h b/paddle/fluid/distributed/collective/process_group_gloo.h index 98ba80e770b9a3..7374683ce3321f 100644 --- a/paddle/fluid/distributed/collective/process_group_gloo.h +++ b/paddle/fluid/distributed/collective/process_group_gloo.h @@ -34,7 +34,7 @@ class ProcessGroupGloo : public ProcessGroupWithoutStream { public std::enable_shared_from_this { public: explicit GlooTask(int rank, - const std::vector& input_tensors, + const std::vector& input_tensors, CommType comm_type); ~GlooTask() = default; @@ -136,30 +136,30 @@ class ProcessGroupGloo : public ProcessGroupWithoutStream { // TODO(sunyilun): methods below will be removed later std::shared_ptr Broadcast( - std::vector& inputs, - std::vector& outputs, + std::vector& inputs, + std::vector& outputs, const BroadcastOptions& = BroadcastOptions()) override; std::shared_ptr Broadcast( - std::vector& inputs, - std::vector& outputs, + std::vector& inputs, + std::vector& outputs, const BroadcastOptions& opts, bool sync_op) override; - std::shared_ptr Send( - std::vector& inputs, int dst_rank) override; + std::shared_ptr Send(std::vector& inputs, + int dst_rank) override; - std::shared_ptr Recv( - std::vector& outputs, int src_rank) override; + std::shared_ptr Recv(std::vector& outputs, + int src_rank) override; std::shared_ptr AllReduce( - std::vector& inputs, - std::vector& outputs, + std::vector& inputs, + std::vector& outputs, const AllreduceOptions& opts = AllreduceOptions()) override; std::shared_ptr AllReduce( - std::vector& inputs, - std::vector& outputs, + std::vector& inputs, + std::vector& outputs, const AllreduceOptions& opts, bool sync_op) override; @@ -167,22 +167,22 @@ class ProcessGroupGloo : public ProcessGroupWithoutStream { const BarrierOptions& = BarrierOptions()) override; std::shared_ptr AllGather( - std::vector& in_tensors, - std::vector& out_tensors) override; + std::vector& in_tensors, + std::vector& out_tensors) override; std::shared_ptr AllGather( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, bool sync_op) override; std::shared_ptr Reduce( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, const ReduceOptions& opts) override; std::shared_ptr Scatter( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, const ScatterOptions&) override; std::shared_ptr<::gloo::Context> get_context() { return _context; } diff --git a/paddle/fluid/distributed/collective/process_group_kernel_utils.cc b/paddle/fluid/distributed/collective/process_group_kernel_utils.cc index d241dcf6b60969..f9dead4e0b4def 100644 --- a/paddle/fluid/distributed/collective/process_group_kernel_utils.cc +++ b/paddle/fluid/distributed/collective/process_group_kernel_utils.cc @@ -23,8 +23,8 @@ namespace distributed { template <> void SplitDenseTensorByNumelWithType(const phi::XPUContext &dev_ctx, - const phi::DenseTensor &t_in, - std::vector *t_list, + const DenseTensor &t_in, + std::vector *t_list, phi::DataType type) { switch (type) { case phi::DataType::FLOAT16: @@ -57,11 +57,10 @@ void SplitDenseTensorByNumelWithType(const phi::XPUContext &dev_ctx, } template <> -void ConcatDenseTensorByNumelWithType( - const phi::XPUContext &dev_ctx, - const std::vector &t_list, - phi::DenseTensor *p_out, - phi::DataType type) { +void ConcatDenseTensorByNumelWithType(const phi::XPUContext &dev_ctx, + const std::vector &t_list, + DenseTensor *p_out, + phi::DataType type) { switch (type) { case phi::DataType::FLOAT16: ConcatDenseTensorByNumel()( @@ -94,8 +93,8 @@ void ConcatDenseTensorByNumelWithType( } void ConcatTensorByNumel(const phi::DeviceContext &dev_ctx, - const std::vector &tensor_list, - phi::DenseTensor *tensor) { + const std::vector &tensor_list, + DenseTensor *tensor) { const auto &place = dev_ctx.GetPlace(); if (phi::is_xpu_place(place)) { #ifdef PADDLE_WITH_XPU @@ -116,8 +115,8 @@ void ConcatTensorByNumel(const phi::DeviceContext &dev_ctx, } void SplitTensorByNumel(const phi::DeviceContext &dev_ctx, - const phi::DenseTensor &tensor, - std::vector *tensor_list) { + const DenseTensor &tensor, + std::vector *tensor_list) { const auto &place = dev_ctx.GetPlace(); if (phi::is_xpu_place(place)) { #ifdef PADDLE_WITH_XPU diff --git a/paddle/fluid/distributed/collective/process_group_kernel_utils.h b/paddle/fluid/distributed/collective/process_group_kernel_utils.h index 0ce52b4fafe77e..20f899475babe1 100644 --- a/paddle/fluid/distributed/collective/process_group_kernel_utils.h +++ b/paddle/fluid/distributed/collective/process_group_kernel_utils.h @@ -27,24 +27,24 @@ namespace distributed { template struct ConcatDenseTensorByNumel { void operator()(const DeviceContext &context, - const std::vector &in, - phi::DenseTensor *out) { + const std::vector &in, + DenseTensor *out) { if (out->numel() == 0) { return; } auto out_dims = common::vectorize(out->dims()); auto flattened_out_dims = {out->numel()}; - std::vector in_flatten; + std::vector in_flatten; std::vector> origin_in_dims; - phi::DenseTensor out_flatten(out->Holder(), out->meta()); + DenseTensor out_flatten(out->Holder(), out->meta()); out_flatten.Resize(flattened_out_dims); int64_t in_numel_sum = 0; for (auto &tensor : in) { if (tensor.numel() > 0) { - phi::DenseTensor tensor_flatten(tensor.Holder(), tensor.meta()); + DenseTensor tensor_flatten(tensor.Holder(), tensor.meta()); tensor_flatten.Resize({tensor.numel()}); in_flatten.push_back(tensor_flatten); in_numel_sum += tensor.numel(); @@ -61,11 +61,10 @@ struct ConcatDenseTensorByNumel { }; template -void ConcatDenseTensorByNumelWithType( - const DeviceContext &dev_ctx, - const std::vector &t_list, - phi::DenseTensor *p_out, - phi::DataType type) { +void ConcatDenseTensorByNumelWithType(const DeviceContext &dev_ctx, + const std::vector &t_list, + DenseTensor *p_out, + phi::DataType type) { switch (type) { case phi::DataType::BOOL: ConcatDenseTensorByNumel()(dev_ctx, t_list, p_out); @@ -108,24 +107,24 @@ void ConcatDenseTensorByNumelWithType( template struct SplitDenseTensorByNumel { void operator()(const DeviceContext &context, - const phi::DenseTensor &in, - std::vector *out) { + const DenseTensor &in, + std::vector *out) { if (in.numel() == 0) { return; } - phi::DenseTensor in_flatten(in.Holder(), in.meta()); + DenseTensor in_flatten(in.Holder(), in.meta()); in_flatten.Resize({in.numel()}); - std::vector out_flatten; - std::vector shape_refer; - std::vector out_p_list; + std::vector out_flatten; + std::vector shape_refer; + std::vector out_p_list; int64_t out_numel_sum = 0; for (auto &tensor : *out) { if (tensor.numel() > 0) { - phi::DenseTensor tensor_flatten(tensor.Holder(), tensor.meta()); + DenseTensor tensor_flatten(tensor.Holder(), tensor.meta()); tensor_flatten.Resize({tensor.numel()}); out_flatten.push_back(tensor_flatten); out_numel_sum += tensor.numel(); @@ -148,8 +147,8 @@ struct SplitDenseTensorByNumel { template void SplitDenseTensorByNumelWithType(const DeviceContext &dev_ctx, - const phi::DenseTensor &t_in, - std::vector *t_list, + const DenseTensor &t_in, + std::vector *t_list, phi::DataType type) { switch (type) { case phi::DataType::BOOL: @@ -188,11 +187,11 @@ void SplitDenseTensorByNumelWithType(const DeviceContext &dev_ctx, } void ConcatTensorByNumel(const phi::DeviceContext &dev_ctx, - const std::vector &tensor_list, - phi::DenseTensor *tensor); + const std::vector &tensor_list, + DenseTensor *tensor); void SplitTensorByNumel(const phi::DeviceContext &dev_ctx, - const phi::DenseTensor &tensor, - std::vector *tensor_list); + const DenseTensor &tensor, + std::vector *tensor_list); } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/collective/process_group_mpi.cc b/paddle/fluid/distributed/collective/process_group_mpi.cc index 699cdfe15eefd8..22957410e48a67 100644 --- a/paddle/fluid/distributed/collective/process_group_mpi.cc +++ b/paddle/fluid/distributed/collective/process_group_mpi.cc @@ -35,7 +35,7 @@ void ProcessGroupMPI::MPITask::FinishMPITaskError(std::exception_ptr eptr) { void ProcessGroupMPI::MPITask::FinishMPITask() { Finish(); } ProcessGroupMPI::MPIAsyncTask::MPIAsyncTask( - MPI_Request request, const std::vector& inputs) + MPI_Request request, const std::vector& inputs) : ProcessGroup::Task(-1, inputs, CommType::UNKNOWN), request_(request) { memset(&status_, 0, sizeof(status_)); } @@ -93,8 +93,8 @@ void ProcessGroupMPI::MPIAsyncTask::AppearException() { } void ProcessGroupMPI::MPIAsyncTask::SetOutputs( - std::vector& outputs) { - outputs_ = std::make_shared>(outputs); + std::vector& outputs) { + outputs_ = std::make_shared>(outputs); } int ProcessGroupMPI::mpi_thread_support = 0; @@ -226,8 +226,7 @@ void ProcessGroupMPI::workLoop() { } std::shared_ptr ProcessGroupMPI::Enqueue( - std::unique_ptr entry, - const std::vector& inputs) { + std::unique_ptr entry, const std::vector& inputs) { CheckTensorContiguous(inputs); auto task = std::make_shared(entry->dst_, inputs); std::unique_lock lock(pg_mutex); @@ -238,8 +237,8 @@ std::shared_ptr ProcessGroupMPI::Enqueue( } std::shared_ptr ProcessGroupMPI::Broadcast( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, const BroadcastOptions& opts) { CheckTensorContiguous(in_tensors); CheckTensorContiguous(out_tensors); @@ -264,8 +263,8 @@ std::shared_ptr ProcessGroupMPI::Broadcast( } std::shared_ptr ProcessGroupMPI::AllReduce( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, const AllreduceOptions& opts) { CheckTensorContiguous(in_tensors); @@ -296,12 +295,12 @@ std::shared_ptr ProcessGroupMPI::Barrier( }; auto entry = std::make_unique(nullptr, nullptr, std::move(runFunc)); - return Enqueue(std::move(entry), std::vector{}); + return Enqueue(std::move(entry), std::vector{}); } // NOTE: MPI_send tag set gid_ std::shared_ptr ProcessGroupMPI::Send( - std::vector& tensors, int dst_rank) { + std::vector& tensors, int dst_rank) { CheckTensorContiguous(tensors); mpi::CheckValidInputs(tensors); @@ -324,7 +323,7 @@ std::shared_ptr ProcessGroupMPI::Send( } std::shared_ptr ProcessGroupMPI::Recv( - std::vector& tensors, int src_rank) { + std::vector& tensors, int src_rank) { CheckTensorContiguous(tensors); mpi::CheckValidInputs(tensors); @@ -346,8 +345,8 @@ std::shared_ptr ProcessGroupMPI::Recv( } std::shared_ptr ProcessGroupMPI::AllGather( - std::vector& in_tensors, - std::vector& out_tensors) { + std::vector& in_tensors, + std::vector& out_tensors) { CheckTensorContiguous(in_tensors); CheckTensorContiguous(out_tensors); @@ -361,7 +360,7 @@ std::shared_ptr ProcessGroupMPI::AllGather( std::function&)> runFunc = [this](std::unique_ptr& entry) { auto data = (entry->src_)[0]; - std::vector dst = entry->dst_; + std::vector dst = entry->dst_; std::unique_lock lock(pg_global_mutex); MPI_CHECK(MPI_Allgather(data.data(), @@ -380,8 +379,8 @@ std::shared_ptr ProcessGroupMPI::AllGather( } std::shared_ptr ProcessGroupMPI::AllToAll( - std::vector& in_tensors, - std::vector& out_tensors) { + std::vector& in_tensors, + std::vector& out_tensors) { CheckTensorContiguous(in_tensors); CheckTensorContiguous(out_tensors); @@ -415,8 +414,8 @@ std::shared_ptr ProcessGroupMPI::AllToAll( } std::shared_ptr ProcessGroupMPI::Reduce( - std::vector& tensors, - std::vector& out_tensors, + std::vector& tensors, + std::vector& out_tensors, const ReduceOptions& opts) { CheckTensorContiguous(tensors); CheckTensorContiguous(out_tensors); @@ -445,8 +444,8 @@ std::shared_ptr ProcessGroupMPI::Reduce( } std::shared_ptr ProcessGroupMPI::Scatter( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, const ScatterOptions& opts) { CheckTensorContiguous(in_tensors); CheckTensorContiguous(out_tensors); @@ -459,7 +458,7 @@ std::shared_ptr ProcessGroupMPI::Scatter( void* sendbuf = nullptr; if (rank_ == opts.root_rank) { - std::vector& inputData = entry->src_; + std::vector& inputData = entry->src_; sendbuf = inputData[0].data(); } diff --git a/paddle/fluid/distributed/collective/process_group_mpi.h b/paddle/fluid/distributed/collective/process_group_mpi.h index a0ba528ca3cd51..ea6871e6daa92c 100644 --- a/paddle/fluid/distributed/collective/process_group_mpi.h +++ b/paddle/fluid/distributed/collective/process_group_mpi.h @@ -38,10 +38,10 @@ namespace paddle { namespace distributed { struct TaskEntry { - explicit TaskEntry(std::vector* src_ptr, - std::vector* dst_ptr, + explicit TaskEntry(std::vector* src_ptr, + std::vector* dst_ptr, std::function&)> run) - : dst_(dst_ptr ? *dst_ptr : std::vector()), + : dst_(dst_ptr ? *dst_ptr : std::vector()), run_(std::move(run)) { if (src_ptr) { src_ = *src_ptr; @@ -51,8 +51,8 @@ struct TaskEntry { TaskEntry(const TaskEntry&) = delete; TaskEntry& operator=(const TaskEntry&) = delete; - std::vector src_; - std::vector dst_; + std::vector src_; + std::vector dst_; int* srcRank_ = nullptr; std::function&)> run_; @@ -62,8 +62,8 @@ class ProcessGroupMPI : public ProcessGroupWithoutStream { public: class MPITask : public ProcessGroup::Task { public: - explicit MPITask(std::vector outputTensors, - const std::vector& inputTensors) + explicit MPITask(std::vector outputTensors, + const std::vector& inputTensors) : ProcessGroup::Task(-1, inputTensors, CommType::UNKNOWN), outputs_(std::move(outputTensors)) {} @@ -101,7 +101,7 @@ class ProcessGroupMPI : public ProcessGroupWithoutStream { void FinishMPITask(); void FinishMPITaskError(std::exception_ptr eptr); - std::vector outputs_; + std::vector outputs_; std::condition_variable cv_; std::exception_ptr exception_; }; @@ -109,8 +109,7 @@ class ProcessGroupMPI : public ProcessGroupWithoutStream { public: class MPIAsyncTask : public ProcessGroup::Task { public: - MPIAsyncTask(MPI_Request request, - const std::vector& inputs); + MPIAsyncTask(MPI_Request request, const std::vector& inputs); bool IsCompleted(); @@ -118,7 +117,7 @@ class ProcessGroupMPI : public ProcessGroupWithoutStream { bool Wait(std::chrono::milliseconds timeout = kWaitTimeout); - void SetOutputs(std::vector& outputs); // NOLINT + void SetOutputs(std::vector& outputs); // NOLINT virtual ~MPIAsyncTask(); @@ -126,7 +125,7 @@ class ProcessGroupMPI : public ProcessGroupWithoutStream { void AppearException(); private: - std::shared_ptr> outputs_; + std::shared_ptr> outputs_; MPI_Request request_; MPI_Status status_; std::exception_ptr exception_; @@ -139,40 +138,39 @@ class ProcessGroupMPI : public ProcessGroupWithoutStream { std::string GetBackendName() const override { return "MPI"; } std::shared_ptr AllReduce( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, const AllreduceOptions& = AllreduceOptions()) override; std::shared_ptr Broadcast( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, const BroadcastOptions& = BroadcastOptions()) override; std::shared_ptr Barrier( const BarrierOptions& = BarrierOptions()) override; - std::shared_ptr Send( - std::vector& tensors, int dst_rank) override; + std::shared_ptr Send(std::vector& tensors, + int dst_rank) override; - std::shared_ptr Recv( - std::vector& tensors, int src_rank) override; + std::shared_ptr Recv(std::vector& tensors, + int src_rank) override; std::shared_ptr AllGather( - std::vector& in_tensors, - std::vector& out_tensors) override; + std::vector& in_tensors, + std::vector& out_tensors) override; std::shared_ptr AllToAll( - std::vector& in, - std::vector& out) override; + std::vector& in, std::vector& out) override; std::shared_ptr Reduce( - std::vector& tensors, - std::vector& out_tensors, + std::vector& tensors, + std::vector& out_tensors, const ReduceOptions& opts) override; std::shared_ptr Scatter( - std::vector& in_tensors, - std::vector& out_tensors, + std::vector& in_tensors, + std::vector& out_tensors, const ScatterOptions&) override; static std::shared_ptr CreateProcessGroupMPI( @@ -182,8 +180,7 @@ class ProcessGroupMPI : public ProcessGroupWithoutStream { void workLoop(); std::shared_ptr Enqueue( - std::unique_ptr entry, - const std::vector& inputs); + std::unique_ptr entry, const std::vector& inputs); private: bool stop_{false}; diff --git a/paddle/fluid/distributed/collective/process_group_nccl.cc b/paddle/fluid/distributed/collective/process_group_nccl.cc index 6ef4029977b6ee..5fe4a3691d98ac 100644 --- a/paddle/fluid/distributed/collective/process_group_nccl.cc +++ b/paddle/fluid/distributed/collective/process_group_nccl.cc @@ -363,7 +363,7 @@ std::shared_ptr ProcessGroupNCCL::AllToAll( int64_t out_row_size = out_dim[0] == 0 ? 0 : out_tensor->numel() / out_dim[0]; int64_t in_offset = 0, in_numel = 0, out_offset = 0, out_numel = 0; - phi::DenseTensor input_partial, output_partial; + DenseTensor input_partial, output_partial; VLOG(3) << "[AllToAll] " << "sendbuff: " << in_tensor.data() @@ -407,8 +407,8 @@ std::shared_ptr ProcessGroupNCCL::AllToAll( } std::shared_ptr ProcessGroupNCCL::AllToAll( - std::vector* out_tensors, - const std::vector& in_tensors, + std::vector* out_tensors, + const std::vector& in_tensors, bool sync_op, bool use_calc_stream) { CheckTensorContiguous(in_tensors); @@ -492,7 +492,7 @@ std::shared_ptr ProcessGroupNCCL::Barrier( auto allocator = std::unique_ptr( new paddle::experimental::DefaultAllocator(place)); phi::DenseTensorMeta meta(phi::DataType::FLOAT32, phi::DDim{1}); - phi::DenseTensor barrier_tensor{allocator.get(), meta}; + DenseTensor barrier_tensor{allocator.get(), meta}; VLOG(3) << "[Barrier] " << "barrier opt: " << opts.device_id; @@ -665,7 +665,7 @@ std::shared_ptr ProcessGroupNCCL::Scatter( int64_t numel = in_tensor.numel() / size_; if (rank_ == opts.root_rank) { int64_t offset = 0; - phi::DenseTensor partial_tensor; + DenseTensor partial_tensor; GroupStart(); for (auto i = 0; i < size_; i++) { partial_tensor = GetPartialTensor(in_tensor, offset, numel); @@ -693,7 +693,7 @@ std::shared_ptr ProcessGroupNCCL::Gather( CheckTensorContiguous(in_tensor); CheckTensorContiguous(*out_tensor); - std::vector partial_tensors; + std::vector partial_tensors; if (rank_ == opts.root_rank) { partial_tensors.reserve(size_); size_t offset = 0; @@ -709,7 +709,7 @@ std::shared_ptr ProcessGroupNCCL::Gather( } std::shared_ptr ProcessGroupNCCL::Gather( - std::vector* gather_tensors_ptr, + std::vector* gather_tensors_ptr, const phi::DenseTensor& in_tensor, const GatherOptions& opts, bool sync_op, @@ -774,7 +774,7 @@ std::shared_ptr ProcessGroupNCCL::Recv( bool use_calc_stream) { CheckTensorContiguous(*tensor); // numel > 0 indicates the tensor need to be sliced - phi::DenseTensor partial_tensor; + DenseTensor partial_tensor; if (numel > 0) { partial_tensor = GetPartialTensor(*tensor, offset, numel); tensor = &partial_tensor; @@ -1092,7 +1092,7 @@ void ProcessGroupNCCL::EagerConnectRingExchange() { std::shared_ptr ProcessGroupNCCL::Collective( std::function fn, - const std::vector& tensors, + const std::vector& tensors, CommType comm_type, bool sync_op, bool use_calc_stream) { @@ -1171,7 +1171,7 @@ std::shared_ptr ProcessGroupNCCL::Collective( } else { for (size_t i = 0; i < tensors.size(); ++i) { coalescing_tensors_.emplace_back( - std::make_shared(tensors[i])); + std::make_shared(tensors[i])); } coalescing_place_keys_.push_back(key); } @@ -1203,7 +1203,7 @@ std::shared_ptr ProcessGroupNCCL::Collective( CommType comm_type, bool sync_op, bool use_calc_stream) { - const std::vector tensors = {tensor}; + const std::vector tensors = {tensor}; return Collective(fn, tensors, comm_type, sync_op, use_calc_stream); } @@ -1304,8 +1304,7 @@ std::shared_ptr ProcessGroupNCCL::Point2Point( task->UpdateWaitChain(*comm_ctx); allocation_stream_pairs_.emplace_back(tensor.Holder(), nccl_stream); } else { - coalescing_tensors_.emplace_back( - std::make_shared(tensor)); + coalescing_tensors_.emplace_back(std::make_shared(tensor)); coalescing_place_keys_.push_back(key); } } diff --git a/paddle/fluid/distributed/collective/process_group_nccl.h b/paddle/fluid/distributed/collective/process_group_nccl.h index cd63d67e2f25c0..fc1d662291b34f 100644 --- a/paddle/fluid/distributed/collective/process_group_nccl.h +++ b/paddle/fluid/distributed/collective/process_group_nccl.h @@ -62,7 +62,7 @@ class ProcessGroupNCCL final : public ProcessGroupWithStream { NCCLTask(const std::vector& places, int rank, CommType CommType, - const std::vector& inputs); + const std::vector& inputs); void RemoveHolderStreamInGroup(); @@ -126,8 +126,8 @@ class ProcessGroupNCCL final : public ProcessGroupWithStream { bool use_calc_stream) override; std::shared_ptr AllToAll( - std::vector* out_tensors, - const std::vector& in_tensors, + std::vector* out_tensors, + const std::vector& in_tensors, bool sync_op, bool use_calc_stream) override; @@ -167,7 +167,7 @@ class ProcessGroupNCCL final : public ProcessGroupWithStream { bool use_calc_stream) override; std::shared_ptr Gather( - std::vector* gather_tensors_ptr, + std::vector* gather_tensors_ptr, const phi::DenseTensor& in_tensor, const GatherOptions& opts, bool sync_op, @@ -230,7 +230,7 @@ class ProcessGroupNCCL final : public ProcessGroupWithStream { std::shared_ptr Collective( std::function fn, - const std::vector& tensors, + const std::vector& tensors, CommType comm_type, bool sync_op, bool use_calc_stream); @@ -292,7 +292,7 @@ class ProcessGroupNCCL final : public ProcessGroupWithStream { // For coalescing tensors processing (eg. batch_isend_irecv) bool is_coalescing_{false}; - std::vector> coalescing_tensors_; + std::vector> coalescing_tensors_; std::vector coalescing_place_keys_; std::unordered_map diff --git a/paddle/fluid/distributed/collective/process_group_with_stream.h b/paddle/fluid/distributed/collective/process_group_with_stream.h index 160568e36b01d2..81ec599585b4bc 100644 --- a/paddle/fluid/distributed/collective/process_group_with_stream.h +++ b/paddle/fluid/distributed/collective/process_group_with_stream.h @@ -37,12 +37,12 @@ class ProcessGroupWithStream : public ProcessGroup { // TODO(liyurui): This constructor is temporary here for compatible reason, // will be deleted soon. TaskStream(int rank, - const std::vector& inputs, + const std::vector& inputs, CommType comm_type) : Task(rank, inputs, comm_type) {} TaskStream(int rank, - const std::vector& inputs, + const std::vector& inputs, CommType comm_type, bool sync_op, bool use_calc_stream) @@ -166,8 +166,8 @@ class ProcessGroupWithStream : public ProcessGroup { } std::shared_ptr AllToAll( - std::vector* out_tensors, - const std::vector& in_tensors, + std::vector* out_tensors, + const std::vector& in_tensors, bool sync_op) { return AllToAll(out_tensors, in_tensors, @@ -176,8 +176,8 @@ class ProcessGroupWithStream : public ProcessGroup { } std::shared_ptr AllToAll( - std::vector* out_tensors UNUSED, - const std::vector& in_tensors UNUSED, + std::vector* out_tensors UNUSED, + const std::vector& in_tensors UNUSED, bool sync_op UNUSED, bool use_calc_stream UNUSED) override { PADDLE_THROW(common::errors::Unimplemented( diff --git a/paddle/fluid/distributed/collective/reducer.cc b/paddle/fluid/distributed/collective/reducer.cc index b75a7fdada5908..c27ee4002da3b1 100644 --- a/paddle/fluid/distributed/collective/reducer.cc +++ b/paddle/fluid/distributed/collective/reducer.cc @@ -107,8 +107,7 @@ std::vector> Eager_AssignGroupBySize( int64_t var_size = -1; if (var.is_dense_tensor()) { - var_size = - std::dynamic_pointer_cast(var.impl())->numel(); + var_size = std::dynamic_pointer_cast(var.impl())->numel(); } else if (dialect::IrTensor::classof(var.impl().get())) { var_size = var.numel(); } else { @@ -163,15 +162,14 @@ std::vector> Eager_AssignGroupBySize( template struct ConcatTensorsForAllReduce { void operator()(const DeviceContext &context, - const std::vector &dense_tensors_, + const std::vector &dense_tensors_, Tensor *p_dense_contents) { phi::funcs::ConcatFunctor concat_functor_; concat_functor_( context, dense_tensors_, 0, - std::dynamic_pointer_cast(p_dense_contents->impl()) - .get()); + std::dynamic_pointer_cast(p_dense_contents->impl()).get()); } }; @@ -179,12 +177,11 @@ template struct SplitTensorsForAllReduce { void operator()(const DeviceContext &context, Tensor *p_dense_contents, - std::vector *p_dense_tensors) { + std::vector *p_dense_tensors) { auto *in = - std::dynamic_pointer_cast(p_dense_contents->impl()) - .get(); - std::vector outs; - std::vector shape_refer; + std::dynamic_pointer_cast(p_dense_contents->impl()).get(); + std::vector outs; + std::vector shape_refer; outs.reserve(p_dense_tensors->size()); shape_refer.reserve(p_dense_tensors->size()); @@ -204,12 +201,11 @@ struct SplitTensorsForAllReduce { template struct ConcatTensorsForAllReduce { void operator()(const phi::CustomContext &context, - const std::vector &dense_tensors_, + const std::vector &dense_tensors_, Tensor *p_dense_contents) { phi::DeviceGuard guard(context.GetPlace()); auto *out = - std::dynamic_pointer_cast(p_dense_contents->impl()) - .get(); + std::dynamic_pointer_cast(p_dense_contents->impl()).get(); uint8_t *out_data = reinterpret_cast(out->data()); auto *device = phi::DeviceManager::GetDeviceWithPlace(context.GetPlace()); phi::stream::Stream stream(context.GetPlace(), context.stream()); @@ -233,10 +229,9 @@ template struct SplitTensorsForAllReduce { void operator()(const phi::CustomContext &context, Tensor *p_dense_contents, - std::vector *p_dense_tensors) { + std::vector *p_dense_tensors) { auto *in = - std::dynamic_pointer_cast(p_dense_contents->impl()) - .get(); + std::dynamic_pointer_cast(p_dense_contents->impl()).get(); uint8_t *in_data = reinterpret_cast(in->data()); auto *device = phi::DeviceManager::GetDeviceWithPlace(context.GetPlace()); phi::stream::Stream stream(context.GetPlace(), context.stream()); @@ -260,7 +255,7 @@ struct SplitTensorsForAllReduce { template static void ConcatTensorsWithType( const DeviceContext &context, - const std::vector &dense_tensors_, + const std::vector &dense_tensors_, Tensor *p_dense_contents, phi::DataType type) { switch (type) { @@ -292,7 +287,7 @@ static void ConcatTensorsWithType( template static void SplitTensorsWithType(const DeviceContext &context, Tensor *p_dense_contents, - std::vector *p_dense_tensors, + std::vector *p_dense_tensors, phi::DataType type) { switch (type) { case phi::DataType::FLOAT16: @@ -324,7 +319,7 @@ static void SplitTensorsWithType(const DeviceContext &context, template <> void ConcatTensorsWithType( const phi::XPUContext &context, - const std::vector &dense_tensors_, + const std::vector &dense_tensors_, Tensor *p_dense_contents, phi::DataType type) { switch (type) { @@ -353,7 +348,7 @@ template <> void SplitTensorsWithType( const phi::XPUContext &context, Tensor *p_dense_contents, - std::vector *p_dense_tensors, + std::vector *p_dense_tensors, phi::DataType type) { switch (type) { case phi::DataType::FLOAT32: @@ -435,7 +430,7 @@ void EagerGroup::SplitTensors(const phi::DeviceContext &context) { gpu_context, &dense_contents_, &dense_tensors_, dtype_); if (IsStreamSafeAllocator()) { auto dense_tensor = - std::dynamic_pointer_cast(dense_contents_.impl()); + std::dynamic_pointer_cast(dense_contents_.impl()); VLOG(3) << "Free dense_contents_ " << dense_contents_.numel(); memory::RecordStream(dense_tensor->Holder(), gpu_context.stream()); dense_contents_.reset(); @@ -774,7 +769,7 @@ void EagerReducer::AddDistHook(size_t var_index) { if (HasGrad(var_index)) { auto grad_dense_tensor = - *(std::dynamic_pointer_cast(grad_tensor.impl())); + *(std::dynamic_pointer_cast(grad_tensor.impl())); group_tensor.ShareDataWith(grad_dense_tensor); } return; @@ -853,16 +848,16 @@ void EagerReducer::MarkVarReady(const size_t var_index, if (grad_tensor.is_dense_tensor()) { const auto &tensor_impl = grad_tensor.impl(); auto dense_tensor = - std::dynamic_pointer_cast(tensor_impl); + std::dynamic_pointer_cast(tensor_impl); if (!dense_tensor->meta().is_contiguous()) { - grad_tensor.set_impl(std::make_shared( + grad_tensor.set_impl(std::make_shared( paddle::experimental::Trans2Contiguous(*dense_tensor))); } } group_tensor - .ShareDataWith(*(std::dynamic_pointer_cast( - grad_tensor.impl()))) + .ShareDataWith( + *(std::dynamic_pointer_cast(grad_tensor.impl()))) .Resize({grad_tensor.numel()}); } else { VLOG(3) << "Tensor[" << tensors_[var_index].name() @@ -886,16 +881,16 @@ void EagerReducer::MarkVarReady(const size_t var_index, if (grad_tensor->is_dense_tensor()) { const auto &tensor_impl = grad_tensor->impl(); auto dense_tensor = - std::dynamic_pointer_cast(tensor_impl); + std::dynamic_pointer_cast(tensor_impl); if (!dense_tensor->meta().is_contiguous()) { - grad_tensor->set_impl(std::make_shared( + grad_tensor->set_impl(std::make_shared( paddle::experimental::Trans2Contiguous(*dense_tensor))); } } group_tensor - .ShareDataWith(*(std::dynamic_pointer_cast( - grad_tensor->impl()))) + .ShareDataWith( + *(std::dynamic_pointer_cast(grad_tensor->impl()))) .Resize({length}); } else { VLOG(3) << "Tensor[" << tensors_[var_index].name() @@ -998,18 +993,17 @@ void EagerReducer::ProcessUnusedDenseVars() { const auto *dev_ctx = phi::DeviceContextPool::Instance().Get(inner_place_); auto *global_used_tensor = - std::dynamic_pointer_cast(global_used_vars_.impl()) - .get(); + std::dynamic_pointer_cast(global_used_vars_.impl()).get(); framework::TensorFromVector( local_used_vars_, *dev_ctx, global_used_tensor); distributed::AllreduceOptions opts; opts.reduce_op = ReduceOp::SUM; std::vector reduce_tensors = {global_used_vars_}; - std::vector in_out; + std::vector in_out; in_out.reserve(reduce_tensors.size()); for (auto &t : reduce_tensors) { - in_out.push_back(*std::dynamic_pointer_cast(t.impl())); + in_out.push_back(*std::dynamic_pointer_cast(t.impl())); } process_group_->AllReduce(in_out, in_out, opts)->Synchronize(); @@ -1052,7 +1046,7 @@ void EagerReducer::ProcessUnusedDenseVars() { GetGradNodeFromTensor(&tensors_[var_index])) ->SetFakeEmpty(false); - Tensor grad_value(std::make_shared(src_tensor)); + Tensor grad_value(std::make_shared(src_tensor)); auto dest_var_base = tensors_[var_index]; auto grad_tensor = egr::EagerUtils::mutable_grad(dest_var_base); @@ -1103,10 +1097,10 @@ void EagerReducer::FusedAllReduceSchedule(EagerGroup *group, // all_reduce std::vector reduce_tensors = {group->dense_contents_}; - std::vector in_out; + std::vector in_out; in_out.reserve(reduce_tensors.size()); for (auto &t : reduce_tensors) { - in_out.push_back(*std::dynamic_pointer_cast(t.impl())); + in_out.push_back(*std::dynamic_pointer_cast(t.impl())); } group->task = process_group_->AllReduce(in_out, in_out, opts); @@ -1182,17 +1176,17 @@ void EagerReducer::AllReduceSparse(EagerGroup *group, Tensor rows_num_tensor = paddle::experimental::empty( IntArray({static_cast(size_)}), DataType::INT64, inner_place_); auto *rows_num_dense_tensor = - std::dynamic_pointer_cast(rows_num_tensor.impl()).get(); + std::dynamic_pointer_cast(rows_num_tensor.impl()).get(); framework::TensorFromVector( rows_num_vector, *dev_ctx, rows_num_dense_tensor); distributed::AllreduceOptions opts; opts.reduce_op = ReduceOp::SUM; std::vector reduce_tensors = {rows_num_tensor}; - std::vector in_out; + std::vector in_out; in_out.reserve(reduce_tensors.size()); for (auto &t : reduce_tensors) { - in_out.push_back(*std::dynamic_pointer_cast(t.impl())); + in_out.push_back(*std::dynamic_pointer_cast(t.impl())); } process_group_->AllReduce(in_out, in_out, opts)->Synchronize(); @@ -1210,7 +1204,7 @@ void EagerReducer::AllReduceSparse(EagerGroup *group, dev_ctx->Wait(); - Tensor src_value_tensor(std::make_shared(src->value())); + Tensor src_value_tensor(std::make_shared(src->value())); std::vector dst_shape = src_value_tensor.shape(); if (std::all_of(cpu_rows_num_ptr, cpu_rows_num_ptr + size_, [&](int64_t row) { @@ -1230,35 +1224,33 @@ void EagerReducer::AllReduceSparse(EagerGroup *group, DataType::INT64, inner_place_); auto *src_rows_dense_tensor = - std::dynamic_pointer_cast(src_rows_tensor.impl()) - .get(); + std::dynamic_pointer_cast(src_rows_tensor.impl()).get(); framework::TensorFromVector( (*src).rows(), *dev_ctx, src_rows_dense_tensor); std::vector src_rows_tensors = {src_rows_tensor}; std::vector dst_rows_tensors = {dst_rows_tensor}; - std::vector in; - std::vector out; + std::vector in; + std::vector out; in.reserve(src_rows_tensors.size()); out.reserve(dst_rows_tensors.size()); for (auto &t : src_rows_tensors) { - in.push_back(*std::dynamic_pointer_cast(t.impl())); + in.push_back(*std::dynamic_pointer_cast(t.impl())); } for (auto &t : dst_rows_tensors) { - out.push_back(*std::dynamic_pointer_cast(t.impl())); + out.push_back(*std::dynamic_pointer_cast(t.impl())); } process_group_->AllGather(in, out)->Synchronize(); phi::Vector dst_rows_vector(rows_num, 0); auto *dst_rows_dense_tensor = - std::dynamic_pointer_cast(dst_rows_tensor.impl()) - .get(); + std::dynamic_pointer_cast(dst_rows_tensor.impl()).get(); framework::TensorToVector( *dst_rows_dense_tensor, *dev_ctx, &dst_rows_vector); dev_ctx->Wait(); dst_shape[dst_shape.size() - 2] = rows_num; - auto dst_dense_tensor = std::dynamic_pointer_cast( + auto dst_dense_tensor = std::dynamic_pointer_cast( paddle::experimental::full( IntArray(dst_shape), 0, src_value_tensor.dtype(), inner_place_) .impl()); @@ -1266,27 +1258,25 @@ void EagerReducer::AllReduceSparse(EagerGroup *group, auto dst = std::make_shared(dst_rows_vector, (*src).height()); *(dst->mutable_value()) = *dst_dense_tensor; - Tensor dst_value_tensor(std::make_shared(dst->value())); + Tensor dst_value_tensor(std::make_shared(dst->value())); std::vector src_value_tensors = {src_value_tensor}; std::vector dst_value_tensors = {dst_value_tensor}; - std::vector src_dense; - std::vector dst_dense; + std::vector src_dense; + std::vector dst_dense; src_dense.reserve(src_value_tensors.size()); dst_dense.reserve(dst_value_tensors.size()); for (auto &t : src_value_tensors) { - src_dense.push_back( - *std::dynamic_pointer_cast(t.impl())); + src_dense.push_back(*std::dynamic_pointer_cast(t.impl())); } for (auto &t : dst_value_tensors) { - dst_dense.push_back( - *std::dynamic_pointer_cast(t.impl())); + dst_dense.push_back(*std::dynamic_pointer_cast(t.impl())); } process_group_->AllGather(src_dense, dst_dense)->Synchronize(); src->set_rows(dst_rows_vector); *(src->mutable_value()) = - *(std::dynamic_pointer_cast(dst_value_tensor.impl())); + *(std::dynamic_pointer_cast(dst_value_tensor.impl())); } else { std::vector rows_tensors; std::vector values_tensors; @@ -1301,22 +1291,20 @@ void EagerReducer::AllReduceSparse(EagerGroup *group, inner_place_); Tensor values_tensor = paddle::experimental::full( IntArray(value_tensor_shape), 0, src->value().dtype(), inner_place_); - std::vector rows_dense_vector; - std::vector values_dense_vector; + std::vector rows_dense_vector; + std::vector values_dense_vector; if (i == rank_) { auto *rows_dense_tensor = - std::dynamic_pointer_cast(rows_tensor.impl()) - .get(); + std::dynamic_pointer_cast(rows_tensor.impl()).get(); framework::TensorFromVector( src_rows, *dev_ctx, rows_dense_tensor); - values_tensor.set_impl( - std::make_shared(src->value())); + values_tensor.set_impl(std::make_shared(src->value())); } rows_dense_vector.push_back( - *std::dynamic_pointer_cast(rows_tensor.impl())); + *std::dynamic_pointer_cast(rows_tensor.impl())); values_dense_vector.push_back( - *std::dynamic_pointer_cast(values_tensor.impl())); + *std::dynamic_pointer_cast(values_tensor.impl())); auto b_opts = BroadcastOptions(); b_opts.source_rank = i; @@ -1332,16 +1320,15 @@ void EagerReducer::AllReduceSparse(EagerGroup *group, paddle::experimental::concat(rows_tensors, phi::Scalar(0)); phi::Vector dst_rows_vector(rows_num, 0); auto *dst_rows_dense_tensor = - std::dynamic_pointer_cast(dst_rows_tensor.impl()) - .get(); + std::dynamic_pointer_cast(dst_rows_tensor.impl()).get(); framework::TensorToVector( *dst_rows_dense_tensor, *dev_ctx, &dst_rows_vector); src->set_rows(dst_rows_vector); Tensor dst_values_tensor = paddle::experimental::concat(values_tensors, phi::Scalar(0)); - *(src->mutable_value()) = *( - std::dynamic_pointer_cast(dst_values_tensor.impl())); + *(src->mutable_value()) = + *(std::dynamic_pointer_cast(dst_values_tensor.impl())); } } diff --git a/paddle/fluid/distributed/collective/reducer.h b/paddle/fluid/distributed/collective/reducer.h index b262426b7a5e07..f35337af6b87b9 100644 --- a/paddle/fluid/distributed/collective/reducer.h +++ b/paddle/fluid/distributed/collective/reducer.h @@ -51,7 +51,7 @@ class EagerGroup { bool is_sparse_ = false; // for concat kernel - std::vector dense_tensors_; + std::vector dense_tensors_; std::vector length_; int64_t all_length_{0}; std::vector origin_shapes_;