-
Notifications
You must be signed in to change notification settings - Fork 1.1k
[feat] support deepspeed elastic #6955
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @meichangsu1, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the training capabilities by integrating DeepSpeed elastic training. This feature allows the system to dynamically scale training resources, ensuring efficient utilization and resilience against resource fluctuations. It includes robust mechanisms for graceful shutdowns and intelligent checkpoint management, which are vital for maintaining training progress in dynamic environments. Furthermore, the changes enable automatic adjustment of key training parameters to optimize performance under elastic conditions, all supported by clear and detailed documentation. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request adds support for DeepSpeed elastic training, which is a great new feature. The implementation includes changes to training arguments, callbacks for graceful exits, and modifications to the trainer for handling elastic checkpoints and configurations. I've reviewed the changes and have some feedback, mostly related to documentation formatting, code consistency, and a potential bug in checkpoint saving. Overall, the changes look good and the new functionality is well-integrated.
| os.path.join(output_dir, f'rng_state_{self.args.process_index}.pth'), | ||
| ) | ||
| if self.args.save_safetensors: | ||
| torch.save({'safe_serialization': True}, 'safe_serialization') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to torch.save (which is patched to self.flash_checkpointer.ckpt_agent.save) is saving a file named safe_serialization to the current working directory. This is likely incorrect and will pollute the CWD. It should probably be saved within the output_dir like other checkpoint files.
| torch.save({'safe_serialization': True}, 'safe_serialization') | |
| torch.save({'safe_serialization': True}, os.path.join(output_dir, 'safe_serialization')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to save to dir, we just let the saver know safe_serialization is true or false
| - micro_batch_sizes:Equivalent to train_micro_batch_size_per_gpu. | ||
| - min_gpus:Minimum number of GPUs. | ||
| - max_gpus:Maximum number of GPUs. | ||
| For more details, see:[Deepspeed]https://www.deepspeed.ai/docs/config-json/#elastic-training-config-v01-and-v02 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docs/source/BestPractices/Elastic.md
Outdated
| - micro_batch_sizes:即train_micro_batch_size_per_gpu | ||
| - min_gpus:最小gpu数目 | ||
| - max_gpus:最大gpu数目 | ||
| 更详细的内容见:[Deepspeed]https://www.deepspeed.ai/docs/config-json/#elastic-training-config-v01-and-v02 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| - eval_generation_config: 评测时模型推理配置,json格式,默认为`{'max_tokens': 512}`。 | ||
| - use_flash_ckpt: 是否启用[DLRover Flash Checkpoint](https://github.com/intelligent-machine-learning/dlrover)的flash checkpoint。默认为`false`,启用后,权重会先保存至共享内存,之后异步持久化,目前暂不支持safetensors格式;建议搭配`PYTORCH_CUDA_ALLOC_CONF="expandable_segments:True"` 一起使用,避免训练过程CUDA OOM。 | ||
| - use_flash_ckpt: 是否启用[DLRover Flash Checkpoint](https://github.com/intelligent-machine-learning/dlrover)的flash checkpoint。默认为`false`,启用后,权重会先保存至共享内存,之后异步持久化;建议搭配`PYTORCH_CUDA_ALLOC_CONF="expandable_segments:True"` 一起使用,避免训练过程CUDA OOM。 | ||
| - elastic:是否启用弹性,依赖[DLRover]https://github.com/intelligent-machine-learning/dlrover,`pip install dlrover && pip install tornado && pip install kubernetes `,具体使用参考[示例](../BestPractices/Elastic.md) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few formatting issues on this line:
- The colon
:is a full-width character. It should be a half-width colon:. - The Markdown link for DLRover is broken. It should be
[DLRover](...). - The comma
,is a full-width character. It should be a half-width comma,.
| - elastic:是否启用弹性,依赖[DLRover]https://github.com/intelligent-machine-learning/dlrover,`pip install dlrover && pip install tornado && pip install kubernetes `,具体使用参考[示例](../BestPractices/Elastic.md) | |
| - elastic: 是否启用弹性,依赖[DLRover](https://github.com/intelligent-machine-learning/dlrover),`pip install dlrover && pip install tornado && pip install kubernetes `,具体使用参考[示例](../BestPractices/Elastic.md) |
|
|
||
| ## Installing Dependencies | ||
|
|
||
| Deploy a K8S cluster and deploy[DLRover]https://github.com/intelligent-machine-learning/dlrover in the cluster,Install the required packages using `pip install dlrover && pip install tornado && pip install kubernetes && pip install ms-swift` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Markdown link for DLRover is broken, and there's a missing space after it.
| Deploy a K8S cluster and deploy[DLRover]https://github.com/intelligent-machine-learning/dlrover in the cluster,Install the required packages using `pip install dlrover && pip install tornado && pip install kubernetes && pip install ms-swift` | |
| Deploy a K8S cluster and deploy [DLRover](https://github.com/intelligent-machine-learning/dlrover) in the cluster, and install the required packages using `pip install dlrover && pip install tornado && pip install kubernetes && pip install ms-swift` |
swift/trainers/mixin.py
Outdated
| resume_dir = get_resume_dir(self.args.output_dir) | ||
| if resume_dir is None: | ||
| return False | ||
| tracer_file = os.path.join(resume_dir, 'dlrover_latest.txt') | ||
| if not os.path.exists(tracer_file): | ||
| step = 0 | ||
| if step == 0: | ||
| return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function get_resume_checkpoint returns False in case of failure. It would be cleaner to return None to be consistent with an Optional[str] return type. While the current implementation works due to how boolean values are evaluated, returning None is better practice.
| resume_dir = get_resume_dir(self.args.output_dir) | |
| if resume_dir is None: | |
| return False | |
| tracer_file = os.path.join(resume_dir, 'dlrover_latest.txt') | |
| if not os.path.exists(tracer_file): | |
| step = 0 | |
| if step == 0: | |
| return False | |
| resume_dir = get_resume_dir(self.args.output_dir) | |
| if resume_dir is None: | |
| return None | |
| tracer_file = os.path.join(resume_dir, 'dlrover_latest.txt') | |
| if not os.path.exists(tracer_file): | |
| step = 0 | |
| if step == 0: | |
| return None |
swift/trainers/mixin.py
Outdated
| def get_resume_checkpoint_until_find_ucp(self): | ||
| resume_dir = get_resume_dir(self.args.output_dir) | ||
| tracer_file = os.path.join(resume_dir, 'ucp.txt') | ||
| if not os.path.exists(tracer_file): | ||
| step = 0 | ||
| if step == 0: | ||
| return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to get_resume_checkpoint, this function returns False on failure. It would be better to return None for consistency and clarity.
| def get_resume_checkpoint_until_find_ucp(self): | |
| resume_dir = get_resume_dir(self.args.output_dir) | |
| tracer_file = os.path.join(resume_dir, 'ucp.txt') | |
| if not os.path.exists(tracer_file): | |
| step = 0 | |
| if step == 0: | |
| return False | |
| def get_resume_checkpoint_until_find_ucp(self): | |
| resume_dir = get_resume_dir(self.args.output_dir) | |
| tracer_file = os.path.join(resume_dir, 'ucp.txt') | |
| if not os.path.exists(tracer_file): | |
| step = 0 | |
| if step == 0: | |
| return None |
docs/source/BestPractices/Elastic.md
Outdated
| `pip install dlrover && pip install tornado && pip install kubernetes && pip install ms-swift` | ||
|
|
||
| 经过反复测试验证的训练镜像中的其它依赖以及版本: | ||
| deepseed 0.16.5(需参考https://github.com/deepspeedai/DeepSpeed/pull/7585/files 修复universal checkpoint 相关问题) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def extract_version(name: str) -> Optional[int]: | ||
| if not name.startswith('v'): | ||
| return None | ||
| try: | ||
| num = name[1:].split('-', 1)[0] | ||
| return int(num) | ||
| except ValueError: | ||
| return None | ||
|
|
||
|
|
||
| def get_previous_version_from_path(current_path: str) -> Optional[str]: | ||
| from pathlib import Path | ||
| current = Path(current_path) | ||
| parent = current.parent | ||
| current_name = current.name | ||
|
|
||
| candidates = [d for d in parent.iterdir() if d.is_dir()] | ||
|
|
||
| valid = [(d.name, extract_version(d.name)) for d in candidates] | ||
| valid = [(name, ver) for name, ver in valid if ver is not None] | ||
|
|
||
| valid.sort(key=lambda x: x[1]) | ||
| names = [name for name, _ in valid] | ||
|
|
||
| if current_name not in names: | ||
| return None | ||
|
|
||
| idx = names.index(current_name) | ||
| if idx == 0: | ||
| return None | ||
|
|
||
| prev_name = names[idx - 1] | ||
| return str(parent / prev_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The functions extract_version and get_previous_version_from_path are duplicated in swift/utils/utils.py. To avoid code duplication, these functions should be defined in one place (e.g., swift/utils/utils.py) and imported where needed. Please remove the duplicated definitions from this file and import them instead.
swift/utils/shutdown_manager.py
Outdated
| """ | ||
| 用于捕获信号 (e.g. SIGTERM, SIGINT, custom) 并设置 shutdown flag。 | ||
| 供训练 / 主流程 / callback 查询是否收到停止请求。 | ||
| """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstrings and comments in this file are in Chinese. For better maintainability and consistency with the rest of the codebase, it would be beneficial to translate them to English.
| """ | |
| 用于捕获信号 (e.g. SIGTERM, SIGINT, custom) 并设置 shutdown flag。 | |
| 供训练 / 主流程 / callback 查询是否收到停止请求。 | |
| """ | |
| """ | |
| Catches signals (e.g. SIGTERM, SIGINT, custom) and sets a shutdown flag. | |
| Used by training / main process / callbacks to check for stop requests. | |
| """ |
- Remove deepspeed elasticity configuration from TrainArguments - Replace ShutdownManager import with DeepspeedElasticCallBack plugin - Remove elasticity batch size calculations from SwiftMixin and DataLoaderMixin - Add DeepspeedElasticCallBack to callbacks when elastic training is enabled
- Add DeepspeedElasticCallBack import to plugin __init__.py - Fix indentation and formatting in deepspeed_elastic.py - Remove duplicate import and clean up whitespace - Ensure proper elastic configuration setup for deepspeed training
PR type
PR information
support deepspeed elatstic training
Experiment results
Paste your experiment result here(if needed).