-
Notifications
You must be signed in to change notification settings - Fork 4
fea: support dcp collective op use nccl symmetric mem #21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: augusto.yjh <[email protected]>
Signed-off-by: augusto.yjh <[email protected]>
Signed-off-by: augusto.yjh <[email protected]>
Signed-off-by: augusto.yjh <[email protected]>
Reviewer's GuideAdds NCCL symmetric memory support for DCP collective operations, wiring it through distributed groups, allocator policy, DeepSeek v2 attention paths, and environment configuration while tightening how collective buffers are allocated and captured. Sequence diagram for DCP attention collectives with NCCL symmetric memorysequenceDiagram
actor User
participant Engine as EngineEnvConfig
participant Dist as GroupCoordinator_DCP
participant Alloc as use_symmetric_memory
participant Attn as DeepSeekV2Attention
participant Utils as CPAttentionUtils
User->>Engine: start_server(--enable-symm-mem, SGLANG_DCP>1)
Engine->>Engine: set NCCL_NVLS_ENABLE
Engine->>Engine: set NCCL_GRAPH_MIXING_SUPPORT=0 (DCP>1)
Engine->>Dist: construct DCP group
Dist->>Dist: read enable_symm_mem from server_args
Dist->>Dist: read dcp_size from SGLANG_DCP
Dist->>Dist: symm_mem_enabled_for_group = True (for DCP group)
User->>Attn: run_forward()
Attn->>Dist: get_dcp_group()
Attn->>Alloc: use_symmetric_memory(Dist)
Alloc->>Dist: check symm_mem_enabled_for_group and world_size>1
Alloc-->>Attn: SymmetricMemoryContext
Attn->>Attn: with SymmetricMemoryContext: torch.cat(q_pe, q_nope_out)
Attn->>Dist: get_dcp_group().all_gather(combined)
Attn->>Alloc: use_symmetric_memory(Dist)
Alloc-->>Attn: SymmetricMemoryContext
Attn->>Attn: with SymmetricMemoryContext: clone attn_output, lse
Attn->>Utils: cp_lse_ag_out_rs(attn_output, lse, cp_group)
Utils->>Dist: cp_group.all_gather(cp_attn_lse, dim=0)
Utils->>Utils: correct_attn_out(...)
Utils->>Dist: cp_group.reduce_scatter_along_dim(out, dim=1)
Dist->>Alloc: use_symmetric_memory(Dist)
Alloc-->>Dist: SymmetricMemoryContext
Dist->>Dist: with SymmetricMemoryContext: allocate reduce_scatter output
Dist->>Dist: reduce_scatter_tensor(output_tensor, input_tensor)
Updated class diagram for distributed groups and symmetric memory policyclassDiagram
class GroupCoordinator {
+device
+device_module
+world_size
+device_group
+symm_mem_enabled_for_group : bool
+graph_capture(stream)
+reduce_scatter_along_dim(input_tensor, dim, op)
+reduce_scatter_tensor(output_tensor, input_tensor)
+all_gather(tensor, dim)
+rank_in_group : int
}
class SymmetricMemoryContext {
+SymmetricMemoryContext(group_coordinator)
+__enter__()
+__exit__()
}
class PyncclAllocatorHelpers {
+use_symmetric_memory(group_coordinator, disabled : bool) SymmetricMemoryContext|nullcontext
}
class DeepSeekV2Attention {
+forward_absorb_prepare(...)
+forward_absorb_core(...)
}
class CPTritonContext {
+CPTritonContext()
}
class CPAttentionUtils {
+cp_lse_ag_out_rs(cp_attn_out, cp_attn_lse, cp_group, ctx)
}
class EngineEnvConfig {
+_set_envs_and_config(server_args)
}
GroupCoordinator "1" --> "1" SymmetricMemoryContext : creates
PyncclAllocatorHelpers --> SymmetricMemoryContext : returns
PyncclAllocatorHelpers --> GroupCoordinator : uses
DeepSeekV2Attention --> PyncclAllocatorHelpers : use_symmetric_memory(get_dcp_group)
DeepSeekV2Attention --> GroupCoordinator : get_dcp_group
CPAttentionUtils --> CPTritonContext : creates
CPAttentionUtils --> GroupCoordinator : cp_group.all_gather()
CPAttentionUtils --> GroupCoordinator : cp_group.reduce_scatter_along_dim()
EngineEnvConfig --> GroupCoordinator : config via env vars
EngineEnvConfig --> PyncclAllocatorHelpers : influences NCCL behavior
Flow diagram for symmetric memory enablement per group and allocationflowchart TD
A[Start group initialization] --> B[Read enable_symm_mem from server_args]
B --> C[Read dcp_size from SGLANG_DCP]
C --> D{dcp_size > 1?}
D -- Yes --> E{group_name == dcp?}
D -- No --> F[Allow symmetric memory for any group]
E -- Yes --> G[enable_symm_mem and DCP group]
E -- No --> H[Disable symmetric memory for this group]
G --> I[Set symm_mem_enabled_for_group = True]
F --> I
H --> J[Set symm_mem_enabled_for_group = False]
I --> K[GroupCoordinator constructed]
J --> K
subgraph Allocation_path
L["Call use_symmetric_memory(group_coordinator, disabled)"] --> M{disabled is True?}
M -- Yes --> N[Return nullcontext]
M -- No --> O{group_coordinator.symm_mem_enabled_for_group?}
O -- No --> N
O -- Yes --> P{group_coordinator.world_size == 1?}
P -- Yes --> N
P -- No --> Q[Return SymmetricMemoryContext]
end
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Summary of ChangesHello @Rythsman, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces support for NCCL symmetric memory in distributed collective operations, primarily targeting the Data Parallel (DCP) group. By conditionally enabling symmetric memory and optimizing related environment variables, the changes aim to enhance the performance of distributed tensor operations within the SGLang framework, particularly affecting memory allocation and data transfer efficiency during model execution. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- In
GroupCoordinator.reduce_scatter_along_dim, the context is entered withwith self.use_symmetric_memory(self):; ifuse_symmetric_memoryis already a bound method that wraps the allocator helper, this extraselfargument is likely redundant or incorrect and could be simplified to match other call sites. - The
symm_mem_enabled_for_groupflag currently relies ongroup_name == "dcp"andSGLANG_DCP; consider centralizing this DCP-detection logic (and avoiding hard-coded group-name strings) so that future changes to group naming or DCP configuration don’t silently break symmetric-memory routing. - In
deepseek_v2.forward_absorb_core, theclone()operations forattn_outputandlseunderenable_symm_memare on a hot path; if possible, reuse buffers or ensure they are allocated with the NCCL allocator earlier to avoid extra allocations and copies for every forward pass.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `GroupCoordinator.reduce_scatter_along_dim`, the context is entered with `with self.use_symmetric_memory(self):`; if `use_symmetric_memory` is already a bound method that wraps the allocator helper, this extra `self` argument is likely redundant or incorrect and could be simplified to match other call sites.
- The `symm_mem_enabled_for_group` flag currently relies on `group_name == "dcp"` and `SGLANG_DCP`; consider centralizing this DCP-detection logic (and avoiding hard-coded group-name strings) so that future changes to group naming or DCP configuration don’t silently break symmetric-memory routing.
- In `deepseek_v2.forward_absorb_core`, the `clone()` operations for `attn_output` and `lse` under `enable_symm_mem` are on a hot path; if possible, reuse buffers or ensure they are allocated with the NCCL allocator earlier to avoid extra allocations and copies for every forward pass.
## Individual Comments
### Comment 1
<location> `python/sglang/srt/distributed/parallel_state.py:321-325` </location>
<code_context>
+ # - When enable_symm_mem is on and DCP is enabled (SGLANG_DCP > 1),
+ # only the DCP group should use SymmetricMemoryContext.
+ # - When DCP is disabled, keep the original behavior.
+ try:
+ from sglang.srt.server_args import get_global_server_args
+
+ enable_symm_mem = bool(get_global_server_args().enable_symm_mem)
+ except Exception:
+ enable_symm_mem = False
+ try:
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Catching all Exceptions when reading `enable_symm_mem` may hide configuration errors.
Using `except Exception:` means any bug in `get_global_server_args()` (e.g., typos, import errors) will silently disable symmetric memory. Consider catching only the specific expected exceptions (such as `ImportError`/`AttributeError`) and/or logging when the fallback is used so configuration issues are detectable.
Suggested implementation:
```python
import logging
import os
logger = logging.getLogger(__name__)
```
```python
# Cache symmetric memory policy for this group.
# - When enable_symm_mem is on and DCP is enabled (SGLANG_DCP > 1),
# only the DCP group should use SymmetricMemoryContext.
# - When DCP is disabled, keep the original behavior.
try:
from sglang.srt.server_args import get_global_server_args
enable_symm_mem = bool(get_global_server_args().enable_symm_mem)
except (ImportError, AttributeError) as exc:
logger.warning(
"Symmetric memory disabled due to configuration issue reading "
"enable_symm_mem: %s",
exc,
)
enable_symm_mem = False
try:
dcp_size = int(os.getenv("SGLANG_DCP", "1") or "1")
except Exception:
dcp_size = 1
self.symm_mem_enabled_for_group = bool(
```
If `import os` is not present or is in a different location in `parallel_state.py`, adjust the first SEARCH/REPLACE block so that `import logging` and the `logger = logging.getLogger(__name__)` initialization are inserted alongside the existing top-level imports for this module.
</issue_to_address>
### Comment 2
<location> `python/sglang/srt/distributed/device_communicators/pynccl_allocator.py:186` </location>
<code_context>
- or disabled
+ if (
+ disabled
+ or not getattr(group_coordinator, "symm_mem_enabled_for_group", False)
or group_coordinator.world_size == 1
- )
</code_context>
<issue_to_address>
**question (bug_risk):** Switching from a global `is_symmetric_memory_enabled()` check to a per-group attribute may change behavior for other coordinators.
This now bases `use_symmetric_memory` solely on `group_coordinator.symm_mem_enabled_for_group`, defaulting to `False` if absent. Callers that previously relied on the global `is_symmetric_memory_enabled()` but don’t set this attribute will lose symmetric memory with no warning. If that’s intended, please ensure all coordinator types initialize `symm_mem_enabled_for_group`; otherwise, consider OR-ing this attribute with the global flag to keep existing behavior.
</issue_to_address>
### Comment 3
<location> `python/sglang/srt/layers/attention/utils.py:365` </location>
<code_context>
- (cp_group.world_size,) + cp_attn_lse.shape,
- dtype=cp_attn_lse.dtype,
- device=cp_attn_lse.device,
+ lses = cp_group.all_gather(cp_attn_lse, dim=0).view(
+ (cp_group.world_size,) + cp_attn_lse.shape
)
</code_context>
<issue_to_address>
**suggestion (performance):** Removing the explicit `contiguous()` on `cp_attn_lse` may affect performance or assumptions in `all_gather`.
`cp_attn_lse` is now passed to `all_gather` without being made contiguous first, so non-contiguous inputs may incur extra internal copies in NCCL and change performance characteristics. If non-contiguous tensors are expected here, consider retaining `.contiguous()` (or enforcing contiguity at the call site) to avoid hidden perf regressions.
</issue_to_address>
### Comment 4
<location> `python/sglang/srt/entrypoints/engine.py:701-706` </location>
<code_context>
os.environ["NCCL_NVLS_ENABLE"] = str(
int(server_args.enable_nccl_nvls or server_args.enable_symm_mem)
)
+ if "NCCL_GRAPH_MIXING_SUPPORT" not in os.environ or server_args.enable_symm_mem:
+ dcp_size = int(os.getenv("SGLANG_DCP", "1") or "1")
+ # Note(wh): NCCL_GRAPH_MIXING_SUPPORT=0 can help improve performance for symmetric kernels.
+ # details in https://github.com/NVIDIA/nccl-tests/issues/333#issuecomment-3103636985
+ if dcp_size > 1:
+ os.environ["NCCL_GRAPH_MIXING_SUPPORT"] = "0"
os.environ["CUDA_DEVICE_MAX_CONNECTIONS"] = "8"
os.environ["CUDA_MODULE_LOADING"] = "AUTO"
</code_context>
<issue_to_address>
**question:** Overriding user-provided `NCCL_GRAPH_MIXING_SUPPORT` when `enable_symm_mem` is true might be surprising.
This logic will override an explicitly set `NCCL_GRAPH_MIXING_SUPPORT` whenever `enable_symm_mem` is true and `dcp_size > 1`. Since this is a performance tuning knob, consider only setting it when the env var is unset, or clearly documenting that `enable_symm_mem` intentionally takes precedence over user configuration.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| try: | ||
| from sglang.srt.server_args import get_global_server_args | ||
|
|
||
| enable_symm_mem = bool(get_global_server_args().enable_symm_mem) | ||
| except Exception: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Catching all Exceptions when reading enable_symm_mem may hide configuration errors.
Using except Exception: means any bug in get_global_server_args() (e.g., typos, import errors) will silently disable symmetric memory. Consider catching only the specific expected exceptions (such as ImportError/AttributeError) and/or logging when the fallback is used so configuration issues are detectable.
Suggested implementation:
import logging
import os
logger = logging.getLogger(__name__) # Cache symmetric memory policy for this group.
# - When enable_symm_mem is on and DCP is enabled (SGLANG_DCP > 1),
# only the DCP group should use SymmetricMemoryContext.
# - When DCP is disabled, keep the original behavior.
try:
from sglang.srt.server_args import get_global_server_args
enable_symm_mem = bool(get_global_server_args().enable_symm_mem)
except (ImportError, AttributeError) as exc:
logger.warning(
"Symmetric memory disabled due to configuration issue reading "
"enable_symm_mem: %s",
exc,
)
enable_symm_mem = False
try:
dcp_size = int(os.getenv("SGLANG_DCP", "1") or "1")
except Exception:
dcp_size = 1
self.symm_mem_enabled_for_group = bool(If import os is not present or is in a different location in parallel_state.py, adjust the first SEARCH/REPLACE block so that import logging and the logger = logging.getLogger(__name__) initialization are inserted alongside the existing top-level imports for this module.
| or disabled | ||
| if ( | ||
| disabled | ||
| or not getattr(group_coordinator, "symm_mem_enabled_for_group", False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question (bug_risk): Switching from a global is_symmetric_memory_enabled() check to a per-group attribute may change behavior for other coordinators.
This now bases use_symmetric_memory solely on group_coordinator.symm_mem_enabled_for_group, defaulting to False if absent. Callers that previously relied on the global is_symmetric_memory_enabled() but don’t set this attribute will lose symmetric memory with no warning. If that’s intended, please ensure all coordinator types initialize symm_mem_enabled_for_group; otherwise, consider OR-ing this attribute with the global flag to keep existing behavior.
| (cp_group.world_size,) + cp_attn_lse.shape, | ||
| dtype=cp_attn_lse.dtype, | ||
| device=cp_attn_lse.device, | ||
| lses = cp_group.all_gather(cp_attn_lse, dim=0).view( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (performance): Removing the explicit contiguous() on cp_attn_lse may affect performance or assumptions in all_gather.
cp_attn_lse is now passed to all_gather without being made contiguous first, so non-contiguous inputs may incur extra internal copies in NCCL and change performance characteristics. If non-contiguous tensors are expected here, consider retaining .contiguous() (or enforcing contiguity at the call site) to avoid hidden perf regressions.
| if "NCCL_GRAPH_MIXING_SUPPORT" not in os.environ or server_args.enable_symm_mem: | ||
| dcp_size = int(os.getenv("SGLANG_DCP", "1") or "1") | ||
| # Note(wh): NCCL_GRAPH_MIXING_SUPPORT=0 can help improve performance for symmetric kernels. | ||
| # details in https://github.com/NVIDIA/nccl-tests/issues/333#issuecomment-3103636985 | ||
| if dcp_size > 1: | ||
| os.environ["NCCL_GRAPH_MIXING_SUPPORT"] = "0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Overriding user-provided NCCL_GRAPH_MIXING_SUPPORT when enable_symm_mem is true might be surprising.
This logic will override an explicitly set NCCL_GRAPH_MIXING_SUPPORT whenever enable_symm_mem is true and dcp_size > 1. Since this is a performance tuning knob, consider only setting it when the env var is unset, or clearly documenting that enable_symm_mem intentionally takes precedence over user configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request adds support for using NCCL symmetric memory for DCP collective operations, which is a good performance enhancement. The changes are well-structured and correctly implement the feature. My review includes a few suggestions to improve code quality and maintainability, such as refining exception handling, removing code duplication, and eliminating a redundant operation. Overall, the implementation is solid.
| try: | ||
| from sglang.srt.server_args import get_global_server_args | ||
|
|
||
| enable_symm_mem = bool(get_global_server_args().enable_symm_mem) | ||
| except Exception: | ||
| enable_symm_mem = False | ||
| try: | ||
| dcp_size = int(os.getenv("SGLANG_DCP", "1") or "1") | ||
| except Exception: | ||
| dcp_size = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using except Exception is too broad and can hide unexpected errors, making debugging more difficult. It's better to catch more specific exceptions. For get_global_server_args(), it raises a ValueError if not initialized. For int(), it raises a ValueError for invalid string conversions. Consider catching ValueError and other potential specific exceptions like ImportError or AttributeError.
except (ImportError, AttributeError, ValueError):
enable_symm_mem = False
try:
dcp_size = int(os.getenv("SGLANG_DCP", "1") or "1")
except ValueError:
dcp_size = 1| int(server_args.enable_nccl_nvls or server_args.enable_symm_mem) | ||
| ) | ||
| if "NCCL_GRAPH_MIXING_SUPPORT" not in os.environ or server_args.enable_symm_mem: | ||
| dcp_size = int(os.getenv("SGLANG_DCP", "1") or "1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if get_global_server_args().enable_symm_mem: | ||
| # Note(wh): make sure input tensors use nccl allocator | ||
| with use_symmetric_memory(get_dcp_group()): | ||
| attn_output = attn_output.clone( | ||
| memory_format=torch.contiguous_format | ||
| ) | ||
| lse = lse.clone(memory_format=torch.contiguous_format) | ||
| attn_output = attn_output.contiguous() | ||
| attn_output = cp_lse_ag_out_rs(attn_output, lse, get_dcp_group()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to attn_output.contiguous() on line 2185 is redundant when get_global_server_args().enable_symm_mem is true, because attn_output is already cloned with memory_format=torch.contiguous_format inside the if block. To avoid this redundancy and make the intent clearer, you can move the .contiguous() call into an else block.
| if get_global_server_args().enable_symm_mem: | |
| # Note(wh): make sure input tensors use nccl allocator | |
| with use_symmetric_memory(get_dcp_group()): | |
| attn_output = attn_output.clone( | |
| memory_format=torch.contiguous_format | |
| ) | |
| lse = lse.clone(memory_format=torch.contiguous_format) | |
| attn_output = attn_output.contiguous() | |
| attn_output = cp_lse_ag_out_rs(attn_output, lse, get_dcp_group()) | |
| if get_global_server_args().enable_symm_mem: | |
| # Note(wh): make sure input tensors use nccl allocator | |
| with use_symmetric_memory(get_dcp_group()): | |
| attn_output = attn_output.clone( | |
| memory_format=torch.contiguous_format | |
| ) | |
| lse = lse.clone(memory_format=torch.contiguous_format) | |
| else: | |
| attn_output = attn_output.contiguous() | |
| attn_output = cp_lse_ag_out_rs(attn_output, lse, get_dcp_group()) |
|
看起来和我这里差不多,如果现在整体启用没效果的话,打个timeline,发我来分析下~ |
|
@Rythsman 我看了下测试脚本,这个对比合适吗? symm_mem的意思应该是分配tensor显存的逻辑不一样,这两个测试一个用了cuda graph,一个没用,感觉变量有点多。 我再本地跑个对比看看。 |
…n is incompatible with TP group symm-mem. Modifications will be made after the resolution of the multi-group symmetric memory coexistence issue.) misc: remove unneed code after rebase fix: fix ar coredump when dcp use symmetric memory fea: add symm-mem unit perf test
Motivation
Modifications
Accuracy Tests
Benchmarking and Profiling
tp16:
dcp8tp16:
Note
Checklist
Summary by Sourcery
Enable NCCL symmetric memory for DCP collectives and integrate it into distributed execution and environment configuration.
New Features:
Enhancements: