-
Notifications
You must be signed in to change notification settings - Fork 1
Port AsyncFlow backends into RHAPSODY #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Summary of ChangesHello @mturilli, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request marks a significant step in the RHAPSODY project by integrating the core backend logic from AsyncFlow and establishing a robust, modern Python development environment. The changes aim to provide a 'drop-in' replacement for existing AsyncFlow backends, ensuring seamless transition and laying a solid foundation for future enhancements and expanded testing. The focus is on modularity, code quality, and ease of contribution, preparing RHAPSODY for its role in executing heterogeneous HPC-AI workflows. Highlights
Ignored Files
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR provides a comprehensive 'drop-in' replacement module for AsyncFlow backends by posting Rhapsody backends into the main repository. It establishes a complete Python module structure for 2025 with multiple execution backends, testing infrastructure, and development tools.
- Implements multiple execution backends (noop, concurrent, dask, radical_pilot) with proper async support
- Establishes comprehensive testing suite including integration, performance, and real-world scenario tests
- Sets up modern Python development infrastructure with proper packaging, CI/CD, and code quality tools
Reviewed Changes
Copilot reviewed 53 out of 57 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| src/rhapsody/ | Core backend implementation with discovery, base classes, and constants |
| tests/ | Comprehensive test suite covering integration, performance, and real-world scenarios |
| pyproject.toml | Modern Python packaging configuration with backend-specific dependencies |
| .pre-commit-config.yaml | Code quality automation with ruff, docformatter, and security checks |
| tox.ini | Multi-environment testing configuration for CI/CD |
Comments suppressed due to low confidence (1)
tests/test_backend_performance.py:1
- Using shell=True with subprocess.run can be a security risk. Consider using shlex.split(cmd) and shell=False to avoid shell injection vulnerabilities.
"""Performance and Load Tests for Rhapsody Backends.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
tests/test_realworld_integration.py
Outdated
| dependencies: List[str] = None | ||
| input_files: List[str] = None | ||
| output_files: List[str] = None | ||
| environment: Dict[str, str] = None | ||
| working_directory: str = None |
Copilot
AI
Sep 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Using mutable default arguments (None) is correct here, but consider using dataclasses.field(default_factory=list) and dataclasses.field(default_factory=dict) for cleaner initialization of mutable defaults in dataclasses.
tests/test_backend_functionality.py
Outdated
| # Quick wait to see if it processes | ||
| try: | ||
| await backend.wait_tasks([tasks[0]["uid"]], timeout=2.0) | ||
| except Exception: # noqa: S110 |
Copilot
AI
Sep 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Bare except clauses should catch more specific exceptions. Consider catching specific timeout-related exceptions instead of using a broad Exception catch.
| except Exception: # noqa: S110 | |
| except asyncio.TimeoutError: |
tests/test_asyncflow_integration.py
Outdated
|
|
||
| max_workers = self.resources.get("max_workers", 2) | ||
| executor = ThreadPoolExecutor(max_workers=max_workers) | ||
| self.backend = await rhapsody.get_backend(self.backend_name, executor) |
Copilot
AI
Sep 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The get_backend function is not async according to the discovery module, but it's being awaited here. This will cause a TypeError. Remove the await keyword.
| self.backend = await rhapsody.get_backend(self.backend_name, executor) | |
| self.backend = rhapsody.get_backend(self.backend_name, executor) |
| result = subprocess.run( # noqa: S602 | ||
| cmd, | ||
| shell=True, | ||
| capture_output=True, | ||
| text=True, | ||
| close_fds=True, | ||
| ) |
Copilot
AI
Sep 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using shell=True with subprocess.run poses a security risk. Consider using shlex.split() to parse the command and set shell=False to prevent shell injection attacks.
| "Intended Audience :: Developers", | ||
| ] | ||
| readme = "README.md" | ||
| requires-python = ">=3.9" |
Copilot
AI
Sep 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The minimum Python version is 3.9, but the tox configuration tests Python 3.13 which may not exist yet. Consider aligning the maximum test version with available Python releases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant and valuable refactoring by establishing a new backend-based architecture for RHAPSODY, which is a great step towards a more modular and extensible system. The introduction of a comprehensive development setup with pyproject.toml, pre-commit hooks, and detailed documentation is also commendable.
However, the review has identified several critical issues that need to be addressed. Many configuration files (pyproject.toml, tox.ini, .pre-commit-config.yaml) contain errors, likely due to copy-pasting from another project, which will prevent the development environment and CI from working correctly. There are also significant security vulnerabilities (shell injection) and fundamental implementation errors in the new backend classes (e.g., not implementing abstract methods). Furthermore, the README.md and CONTRIBUTING.md are out of sync with the new codebase, which will cause confusion for new users and contributors. Addressing these issues will be crucial for the stability and usability of the new architecture.
| cmd = " ".join([task["executable"]] + task.get("arguments", [])) | ||
| process = None | ||
|
|
||
| try: | ||
| process = await asyncio.create_subprocess_shell( | ||
| cmd, | ||
| stdout=asyncio.subprocess.PIPE, | ||
| stderr=asyncio.subprocess.PIPE, | ||
| close_fds=True, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using create_subprocess_shell with shell=True and a command string constructed from user-provided input is a critical security vulnerability, as it can lead to shell injection attacks. The same issue exists in the fallback subprocess.run call.
To fix this, you should use asyncio.create_subprocess_exec and pass the executable and arguments as a sequence, which avoids the shell and is safe from injection.
| cmd = " ".join([task["executable"]] + task.get("arguments", [])) | |
| process = None | |
| try: | |
| process = await asyncio.create_subprocess_shell( | |
| cmd, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| close_fds=True, | |
| ) | |
| cmd_list = [task["executable"]] + task.get("arguments", []) | |
| process = None | |
| try: | |
| process = await asyncio.create_subprocess_exec( | |
| *cmd_list, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| close_fds=True, | |
| ) |
| async def submit_tasks(self, tasks: list[dict]) -> None: | ||
| """Submit a list of tasks for execution. | ||
|
|
||
| Args: | ||
| tasks: A list of dictionaries containing task definitions and metadata. | ||
| Each task dictionary should contain the necessary information for | ||
| task execution. | ||
| """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The signature for submit_tasks is -> None, but some implementations like ConcurrentExecutionBackend return list[asyncio.Task]. This inconsistency should be resolved. Awaiting the returned tasks is a good pattern for managing task completion.
I suggest changing the base class signature to -> list[asyncio.Task] and ensuring all backends adhere to it. Backends that don't naturally return tasks can return an empty list.
| async def submit_tasks(self, tasks: list[dict]) -> None: | |
| """Submit a list of tasks for execution. | |
| Args: | |
| tasks: A list of dictionaries containing task definitions and metadata. | |
| Each task dictionary should contain the necessary information for | |
| task execution. | |
| """ | |
| async def submit_tasks(self, tasks: list[dict]) -> list[asyncio.Task]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is resolved, but I thought I had added mypy to the pre-commit hooks. I had not. I will do that, but @AymenFJA, this will also make me fix the type issues we had in the original code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AymenFJA, I started to fix them, but I noticed that I had a lot of them in the radical-pilot backend. I do not trust myself to modify that code too much, so for now, I will not address the type issues in there.
Removed link to learn more about the NSF-funded project.
AymenFJA
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to the critical situation of this work, we can move forward with merging this PR. We will revisit again and address the rest of it.
Documentation skeleton for website
This should provide a 'drop-in' replacement module for the current AsyncFlow backends. It includes the standard setup for a 2025 Python module and some unit and integration tests that will need to be properly organized and expanded (a follow-up PR will be opened once this is approved). Barring some minor fixes, the backends code should be exactly like the current main of AsyncFlow.