-
Notifications
You must be signed in to change notification settings - Fork 340
Intra-node shared memory (SHM) optimizations for CPU primitives #458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Hi @gaopengff! Thank you for your pull request and welcome to our community. Action RequiredIn order to merge any pull request (code, docs, etc.), we require contributors to sign our Contributor License Agreement, and we don't seem to have one on file for you. ProcessIn order for us to review and merge your suggested changes, please sign at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need to sign the corporate CLA. Once the CLA is signed, our tooling will perform checks and validations. Afterwards, the pull request will be tagged with If you have received this in error or have any questions, please contact us at [email protected]. Thanks! |
gloo/allreduce.cc
Outdated
if (is_intra_node(context->size)) { | ||
shm(opts); | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move this into the allreduce() function above? fits a bit better there since we pick the algorithm at that level
may be nice to add a RING_LOCAL or RING_SHMEM algorithm selector as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your advice. We have added a new algorihtm selectordetail::AllreduceOptionsImpl::SHM and moved statement of checking intra-node to allreduce().
gloo/allreduce.cc
Outdated
@@ -153,6 +154,15 @@ void ring( | |||
const auto slot = Slot::build(kAllreduceSlotPrefix, opts.tag); | |||
const size_t totalBytes = opts.elements * opts.elementSize; | |||
|
|||
|
|||
if (is_intra_node(context->size)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should also check if the tensor is a CUDA tensor and bypass if it is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For cuda check, I think it's not necessary here. If the input tensors' device is cuda, pytorch will either copy cuda tensors to cpu before calling gloo::allreduce or use gloo::CudaAllreduceRingChunked do reduce cuda input directly. In that case gloo::allreduce is only for cpu inputs. Refer to pytorch code at ProcessGroupGlooCuda.cpp
gloo/allreduce_shm.cc
Outdated
|
||
bool is_intra_node(const int size) { | ||
// must launch with torchrun | ||
auto local_size_string = std::getenv("LOCAL_WORLD_SIZE"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this check is safe -- for torchft for instance we often run with Gloo only cross host and if you're using an 8x8 configuration this would trigger shm logic for cross host comms
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my understanding, world_size is 64 and the local_size is 8 if we launch program with 8x8 configuration, which won't introduce shm allreduce.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gaopengff With hybrid parallelism we often create cross host groups that can be smaller than LOCAL_WORLD_SIZE.
I.e. with HSDP in a 2x8 configuration we could be using FSDP within one host and replicating across the hosts. Thus each worker has two instances of Gloo. Once that is size 8 for within the host and then the other with size 2 between each pair of ranks across hosts.
The opposite can happen as well. Say we're running with a tensor parallel dimension of 2 -- that means each 2 workers on one host will have a group of size 2 and this logic would incorrectly not use shm for that operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this hybrid scenario, checking for LOCAL_WORLD_SIZE from environment variables is not correct indeed. It's difficult to get local_world_size in gloo context. So I changed the code to check max_local_rank whether is to equal to world_size. Supposing we are running on 8x8 configuration:
Group1: 1x8 intra-node max_local_rank(8) == world_size(8) -> shm allreduce
Group2: 8x1 inter-node max_local_rank(1) != world_size(8) -> ring allreduce
Do you think the check is right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still need to fix the triggering logic + add some unit tests for this behavior
gloo/allreduce_shm.cc
Outdated
|
||
bool is_intra_node(const int size) { | ||
// must launch with torchrun | ||
auto local_size_string = std::getenv("LOCAL_WORLD_SIZE"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gaopengff With hybrid parallelism we often create cross host groups that can be smaller than LOCAL_WORLD_SIZE.
I.e. with HSDP in a 2x8 configuration we could be using FSDP within one host and replicating across the hosts. Thus each worker has two instances of Gloo. Once that is size 8 for within the host and then the other with size 2 between each pair of ranks across hosts.
The opposite can happen as well. Say we're running with a tensor parallel dimension of 2 -- that means each 2 workers on one host will have a group of size 2 and this logic would incorrectly not use shm for that operation.
@@ -131,14 +132,23 @@ void allreduce(const detail::AllreduceOptionsImpl& opts) { | |||
return; | |||
} | |||
|
|||
switch (opts.algorithm) { | |||
auto algorithm = opts.algorithm; | |||
if (is_intra_node(opts)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing a broadcast prior to every single allreduce seems pretty expensive -- can we move this logic to the TCP init in createAndConnectAllPairs
? We have enough information there I believe to compute the topology since we have all the hostnames.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other option would be to inspect the pairs that are participating and check if they all share the same IP
This PR is for RFC #455. It has implemented shm allreduce.