Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c765fa9
Use rhapsody for backends, maintain compatibility layer
mturilli Sep 27, 2025
5911079
Install rhapsody from GH@dev
mturilli Sep 27, 2025
3dbfc6d
Fix and extend linting
mturilli Sep 27, 2025
44e1c3e
Fix overeager ruff type fixing
mturilli Sep 27, 2025
5efc4aa
Fix pytest-asyncio scope mismatch error
mturilli Sep 27, 2025
229e9f3
RP init is very slow. Share async scope on for RP int tests
mturilli Sep 27, 2025
f490381
Crate a backends-specific scope
mturilli Sep 27, 2025
f104023
Add an explicit loop_scope. Got to love RP...
mturilli Sep 27, 2025
a18065b
Make it DRY!
mturilli Sep 27, 2025
b79a3d2
Implement a registry and factory for backends
mturilli Sep 28, 2025
d49ace2
Make rhapsody facultative and autoload backends
mturilli Sep 28, 2025
90441ff
Fix CI testing issues.
mturilli Sep 28, 2025
5775577
Fixing unit tests, still issues with the integration ones
mturilli Sep 28, 2025
8bb6ab9
Use patch targets for 3.9/10, worked with 3.11+
mturilli Sep 28, 2025
a1655da
Knowing more than I wanted to about 3.9
mturilli Sep 28, 2025
eff3819
Address Gemini review
mturilli Sep 28, 2025
5f98a11
Address Gemini review: remove obsolete register_optional_backends()
mturilli Sep 28, 2025
84c640d
Update documentation with new backend factory
mturilli Sep 28, 2025
2f60d8d
Update and test examples
mturilli Sep 28, 2025
1c22351
Export to JSON...
mturilli Sep 28, 2025
785814e
Well, maybe better we use the work we did...
mturilli Sep 28, 2025
d43814c
From inheritance to duck-typing validation. We may review this in the…
mturilli Sep 28, 2025
d36b073
Using a protocol-based type system to use rhapsody different type system
mturilli Sep 28, 2025
a5d2af9
Unit tests now use the type protocol
mturilli Sep 28, 2025
36622b0
Use protocol for tests
mturilli Sep 29, 2025
d5c1314
Use an available backend for unit tests
mturilli Sep 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,21 @@ cython_debug/
# PyPI configuration file
.pypirc
asyncflow.session.*

# Ignore vscode settings
.vscode/

# Ignore GH instructions
.github/instructions

# Ignore codacy settings
.codacy/

# Ignore devcontainer settings
.devcontainer/

# Ignore direnv settings
.direnv/

# Ignore dotenv settings
.envrc
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ requires-python = ">=3.9"
dependencies = [
"pydantic",
"typeguard",
"requests"
"requests",
"rhapsody @ git+https://github.com/radical-cybertools/rhapsody.git@dev"
]

[project.urls]
Expand Down
33 changes: 24 additions & 9 deletions src/radical/asyncflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,36 @@

import importlib.metadata as importlib_metadata

from .backends.execution.concurrent import ConcurrentExecutionBackend
from .backends.execution.dask_parallel import DaskExecutionBackend
from .backends.execution.noop import NoopExecutionBackend
from .backends.execution.radical_pilot import RadicalExecutionBackend
# Import backends from rhapsody through our wrapper
from .backends.execution import ConcurrentExecutionBackend, NoopExecutionBackend

# Try to import optional backends
try:
from .backends.execution import DaskExecutionBackend
except ImportError:
DaskExecutionBackend = None

try:
from .backends.execution import RadicalExecutionBackend
except ImportError:
RadicalExecutionBackend = None

from .data import InputFile, OutputFile
from .workflow_manager import WorkflowEngine

__version__ = importlib_metadata.version("radical.asyncflow")

__all__ = [
"ConcurrentExecutionBackend",
"DaskExecutionBackend",
"NoopExecutionBackend",
"RadicalExecutionBackend",
"WorkflowEngine",
"InputFile",
"OutputFile",
"WorkflowEngine",
"ConcurrentExecutionBackend",
"NoopExecutionBackend",
]

# Add optional backends to __all__ if they exist
if DaskExecutionBackend is not None:
__all__.append("DaskExecutionBackend")

if RadicalExecutionBackend is not None:
__all__.append("RadicalExecutionBackend")
35 changes: 35 additions & 0 deletions src/radical/asyncflow/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Backend subsystem for AsyncFlow using Rhapsody backends.

This module provides execution backends from the Rhapsody package for running
scientific workflows on various computing infrastructures.
"""

from __future__ import annotations

# Import execution backends from rhapsody
from rhapsody.backends.execution import ConcurrentExecutionBackend, NoopExecutionBackend

# Import base class and session from our compatibility layer
from .base import BaseExecutionBackend, Session

__all__ = [
"BaseExecutionBackend",
"Session",
"NoopExecutionBackend",
"ConcurrentExecutionBackend",
]

# Add optional backends that may be available
try:
from rhapsody.backends.execution import DaskExecutionBackend

__all__.append("DaskExecutionBackend")
except ImportError:
pass

try:
from rhapsody.backends.execution import RadicalExecutionBackend

__all__.append("RadicalExecutionBackend")
except ImportError:
pass
162 changes: 162 additions & 0 deletions src/radical/asyncflow/backends/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""Base execution backend compatibility layer.
This module provides a compatibility layer for rhapsody backends to work
with AsyncFlow's type system.
"""

from __future__ import annotations

import os
from abc import ABC, abstractmethod

# Import the original rhapsody base class
try:
from rhapsody.backends import \
BaseExecutionBackend as RhapsodyBaseExecutionBackend
from rhapsody.backends import Session as RhapsodySession

# Create compatibility aliases
BaseExecutionBackend = RhapsodyBaseExecutionBackend
Session = RhapsodySession

except ImportError:
# Fallback to local definitions if rhapsody is not available
class BaseExecutionBackend(ABC):
"""Abstract base class for execution backends that manage task execution and state.
This class defines the interface for execution backends that handle task submission,
state management, and dependency linking in a distributed or parallel execution
environment.
"""

@abstractmethod
async def submit_tasks(self, tasks: list[dict]) -> None:
"""Submit a list of tasks for execution.
Args:
tasks: A list of dictionaries containing task definitions and metadata.
Each task dictionary should contain the necessary information for
task execution.
"""
pass

@abstractmethod
async def shutdown(self) -> None:
"""Gracefully shutdown the execution backend.
This method should clean up resources, terminate running tasks if necessary,
and prepare the backend for termination.
"""
pass

@abstractmethod
def state(self) -> str:
"""Get the current state of the execution backend.
Returns:
A string representing the current state of the backend (e.g., 'running',
'idle', 'shutting_down', 'error').
"""
pass

@abstractmethod
def task_state_cb(self, task: dict, state: str) -> None:
"""Callback function invoked when a task's state changes.
Args:
task: Dictionary containing task information and metadata.
state: The new state of the task (e.g., 'pending', 'running', 'completed',
'failed').
"""
pass

@abstractmethod
def register_callback(self, func) -> None:
"""Register a callback function for task state changes.
Args:
func: A callable that will be invoked when task states change.
The function should accept task and state parameters.
"""
pass

@abstractmethod
def get_task_states_map(self) -> None:
"""Retrieve a mapping of task IDs to their current states.
Returns:
A dictionary mapping task identifiers to their current execution states.
"""
pass

@abstractmethod
def build_task(self, task: dict) -> None:
"""Build or prepare a task for execution.
Args:
task: Dictionary containing task definition, parameters, and metadata
required for task construction.
"""
pass

@abstractmethod
def link_implicit_data_deps(self, src_task, dst_task):
"""Link implicit data dependencies between two tasks.
Creates a dependency relationship where the destination task depends on
data produced by the source task, with the dependency being inferred
automatically.
Args:
src_task: The source task that produces data.
dst_task: The destination task that depends on the source task's output.
"""
pass

@abstractmethod
def link_explicit_data_deps(
self, src_task=None, dst_task=None, file_name=None, file_path=None
):
"""Link explicit data dependencies between tasks or files.
Creates explicit dependency relationships based on specified file names
or paths, allowing for more precise control over task execution order.
Args:
src_task: The source task that produces the dependency.
dst_task: The destination task that depends on the source.
file_name: Name of the file that represents the dependency.
file_path: Full path to the file that represents the dependency.
"""
pass

@abstractmethod
async def cancel_task(self, uid: str) -> bool:
"""
Cancel a task in the execution backend.
Args:
uid: Task identifier
Raises:
NotImplementedError: If the backend doesn't support cancellation
"""
raise NotImplementedError("Not implemented in the base backend")


class Session:
"""Manages execution session state and working directory.
This class maintains session-specific information including the current
working directory path for task execution.
"""

def __init__(self):
"""Initialize a new session with the current working directory.
Sets the session path to the current working directory at the time
of initialization.
"""
self.path = os.getcwd()

__all__ = ["BaseExecutionBackend", "Session"]
29 changes: 29 additions & 0 deletions src/radical/asyncflow/backends/execution/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Execution backends for AsyncFlow using Rhapsody backends.

This module re-exports all execution backends from rhapsody.backends.execution.
"""

from __future__ import annotations

# Import execution backends from rhapsody
from rhapsody.backends.execution import ConcurrentExecutionBackend, NoopExecutionBackend

# Import base class for compatibility
from ..base import BaseExecutionBackend

__all__ = ["BaseExecutionBackend", "NoopExecutionBackend", "ConcurrentExecutionBackend"]

# Add optional backends
try:
from rhapsody.backends.execution import DaskExecutionBackend

__all__.append("DaskExecutionBackend")
except ImportError:
pass

try:
from rhapsody.backends.execution import RadicalExecutionBackend

__all__.append("RadicalExecutionBackend")
except ImportError:
pass
Loading
Loading