diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 40d402e..c4e640d 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -9,6 +9,7 @@ on: permissions: contents: write + jobs: deploy: runs-on: ubuntu-latest diff --git a/README.md b/README.md index 4348cfc..e35578f 100644 --- a/README.md +++ b/README.md @@ -52,23 +52,21 @@ from concurrent.futures import ThreadPoolExecutor async def main(): # Create backend and workflow backend = await ConcurrentExecutionBackend(ThreadPoolExecutor()) - flow = await WorkflowEngine.create(backend=backend) - - @flow.executable_task - async def task1(): - return "/bin/echo 5" - - @flow.function_task - async def task2(t1_result): - return int(t1_result.strip()) * 2 * 2 - - # create the workflow - t1_fut = task1() - t2_result = await task2(t1_fut) # t2 depends on t1 (waits for it) - - print(t2_result) - # shutdown the execution backend - await flow.shutdown() + async with WorkflowEngine(backend=backend) as flow: + + @flow.executable_task + async def task1(): + return "/bin/echo 5" + + @flow.function_task + async def task2(t1_result): + return int(t1_result.strip()) * 2 * 2 + + # create the workflow + t1_fut = task1() + t2_result = await task2(t1_fut) # t2 depends on t1 (waits for it) + + print(t2_result) if __name__ == "__main__": asyncio.run(main()) diff --git a/docs/async_workflows.md b/docs/async_workflows.md index e3f836d..0fecc7c 100644 --- a/docs/async_workflows.md +++ b/docs/async_workflows.md @@ -36,35 +36,32 @@ from concurrent.futures import ThreadPoolExecutor from radical.asyncflow import WorkflowEngine backend = await ConcurrentExecutionBackend(ThreadPoolExecutor()) -flow = await WorkflowEngine.create(backend=backend) - async def main(): - @flow.function_task - async def task1(*args): - return time.time() + async with await WorkflowEngine.create(backend=backend) as flow: + @flow.function_task + async def task1(*args): + return time.time() - @flow.function_task - async def task2(*args): - return time.time() + @flow.function_task + async def task2(*args): + return time.time() - @flow.function_task - async def task3(*args): - return time.time() + @flow.function_task + async def task3(*args): + return time.time() - async def run_wf(wf_id): - print(f'Starting workflow {wf_id} at {time.time()}') - t3 = task3(task1(), task2()) - await t3 # Blocking operation so the entire workflow will block - print(f'Workflow {wf_id} completed at {time.time()}') + async def run_wf(wf_id): + print(f'Starting workflow {wf_id} at {time.time()}') + t3 = task3(task1(), task2()) + await t3 # Blocking operation so the entire workflow will block + print(f'Workflow {wf_id} completed at {time.time()}') - start_time = time.time() - await asyncio.gather(*[run_wf(i) for i in range(5)]) - end_time = time.time() + start_time = time.time() + await asyncio.gather(*[run_wf(i) for i in range(5)]) + end_time = time.time() - print(f'\nTotal time running asynchronously is: {end_time - start_time}') + print(f'\nTotal time running asynchronously is: {end_time - start_time}') - # We are in an async context, so we have to use await - await flow.shutdown() asyncio.run(main()) ``` diff --git a/docs/basic.md b/docs/basic.md index 0ca4809..3a4d96d 100644 --- a/docs/basic.md +++ b/docs/basic.md @@ -35,7 +35,7 @@ We initialize the workflow engine with a `ConcurrentExecutionBackend` using Pyth ```python backend = await ConcurrentExecutionBackend(ThreadPoolExecutor()) -flow = await WorkflowEngine.create(backend=backend) +async with WorkflowEngine.create(backend=backend) as flow: ``` --- @@ -90,9 +90,6 @@ async def main(): end_time = time.time() print(f"\nWorkflow completed in: {end_time - start_time:.2f} seconds") - # Shutdown the workflow engine - await flow.shutdown() - asyncio.run(main()) ``` @@ -113,9 +110,9 @@ Here’s an example of the output you might see: --- -!!! warning +!!! tip -Make sure to **await the shutdown** of the `WorkflowEngine` before your script exits. Otherwise, resources may leak. +The context manager (`async with`) automatically handles the shutdown of the `WorkflowEngine` when the block exits, so no explicit shutdown is needed. --- diff --git a/docs/best_practice.md b/docs/best_practice.md index a8bd2ae..f25ff5f 100644 --- a/docs/best_practice.md +++ b/docs/best_practice.md @@ -91,20 +91,6 @@ result = await task_c(task_a(), task_b()) --- -## Use `await flow.shutdown()` - -Always shut down the flow explicitly when finished: -- Releases resources (e.g., thread pools, processes). -- Ensures a clean exit. - -At the end of your async main: - -```python -await flow.shutdown() -``` - ---- - ## Logging & Debugging Enable detailed logs to diagnose issues: @@ -116,12 +102,6 @@ Logs show task dependencies, execution order, errors. --- -## Clean Shutdown - -- Use `try`/`finally` in your async main to ensure `flow.shutdown()` is always called, even on exceptions. - ---- - !!! success - Define tasks clearly and concisely. diff --git a/docs/composite_workflow.md b/docs/composite_workflow.md index 91b4450..a0bc4ec 100644 --- a/docs/composite_workflow.md +++ b/docs/composite_workflow.md @@ -39,7 +39,7 @@ from radical.asyncflow import WorkflowEngine from concurrent.futures import ThreadPoolExecutor backend = await ConcurrentExecutionBackend(ThreadPoolExecutor()) -asyncflow = await WorkflowEngine.create(backend=backend) +async with WorkflowEngine.create(backend=backend) as flow: ``` ### Define Tasks @@ -116,7 +116,6 @@ await asyncio.gather( # (1)! end_time = time.time() print(f"\nTotal time running asynchronously is: {end_time - start_time:.2f}s") -await asyncflow.shutdown() # (2)! ``` 1. Run all composite workflow blocks concurrently @@ -311,8 +310,5 @@ await block3 ``` -!!! warning -Do not forget to `await asyncflow.shutdown()` when you are done — otherwise, resources may remain allocated. - !!! tip You can replace `ConcurrentExecutionBackend` with `RadicalExecutionBackend` if you want to run on an HPC cluster instead of local threads/processes. diff --git a/docs/exec_backends.md b/docs/exec_backends.md index dc1b537..3e882d0 100644 --- a/docs/exec_backends.md +++ b/docs/exec_backends.md @@ -50,8 +50,8 @@ from radical.asyncflow import RadicalExecutionBackend from radical.asyncflow import WorkflowEngine # HPC backend configuration -backend = RadicalExecutionBackend({'resource': 'local.localhost'}) # (1)! -flow = WorkflowEngine(backend=backend) +backend = await RadicalExecutionBackend({'resource': 'local.localhost'}) # (1)! +async with await WorkflowEngine.create(backend=backend) as flow: ``` 1. Configure for HPC execution - can target supercomputers, GPU clusters, local resources @@ -118,8 +118,6 @@ await asyncio.gather(*[run_wf(i) for i in range(5)]) # (1)! end_time = time.time() print(f'\nTotal time running asynchronously is: {end_time - start_time}') -# Proper cleanup of HPC resources -await flow.shutdown() ``` 1. All workflows execute concurrently across available HPC nodes @@ -181,7 +179,6 @@ backend = RadicalExecutionBackend({ ``` !!! warning -**Resource Management**: Always call `await flow.shutdown()` to properly release HPC resources and prevent job queue issues. ## Real-World HPC Use Cases diff --git a/docs/index.md b/docs/index.md index 1346d48..eae0cad 100644 --- a/docs/index.md +++ b/docs/index.md @@ -30,7 +30,7 @@ from radical.asyncflow import ConcurrentExecutionBackend async def run(): # Create backend and workflow backend = await ConcurrentExecutionBackend(ThreadPoolExecutor(max_workers=3)) - flow = await WorkflowEngine.create(backend=backend) + async with await WorkflowEngine.create(backend=backend) as flow: @flow.executable_task async def task1(): @@ -44,8 +44,6 @@ async def run(): t1_fut = task1() t2_result = await task2(t1_fut) # t2 depends on t1 (waits for it) - # shutdown the execution backend - await flow.shutdown() if __name__ == "__main__": asyncio.run(run()) diff --git a/examples/async_block_of_blocks.py b/examples/async_block_of_blocks.py index 5b4a8c6..76f1923 100644 --- a/examples/async_block_of_blocks.py +++ b/examples/async_block_of_blocks.py @@ -8,53 +8,51 @@ async def main(): backend = await RadicalExecutionBackend({'resource': 'local.localhost'}) - flow = await WorkflowEngine.create(backend=backend) - - @flow.executable_task - async def task1(*args): - return '/bin/echo "I got executed at" && /bin/date' - - @flow.executable_task - async def task2(*args): - return '/bin/echo "I got executed at" && /bin/date' - - @flow.block - async def block1(*args): - print(f'block1 started at {time.time()}') - t1 = task1() - t2 = task2(t1) - await t2 - - @flow.block - async def block2(*args): - print(f'block2 started at {time.time()}') - t3 = task1() - t4 = task2(t3) - await t4 - - @flow.block - async def block1_of_blocks(*args): - print(f'block of blocks-1 started at {time.time()}') - b1 = block1() - b2 = block2(b1) - await b2 - - @flow.block - async def block2_of_blocks(*args): - print(f'block of blocks-2 started at {time.time()}') - b1 = block1() - b2 = block2(b1) - await b2 - - async def run_block_of_blocks(i): - bob1 = block1_of_blocks() - bob2 = block2_of_blocks(bob1) - await bob2 - print(f'Block of blocks-{i} is finished') - - await asyncio.gather(*[run_block_of_blocks(i) for i in range(2)]) - - await flow.shutdown() + async with await WorkflowEngine.create(backend=backend) as flow: + + @flow.executable_task + async def task1(*args): + return '/bin/echo "I got executed at" && /bin/date' + + @flow.executable_task + async def task2(*args): + return '/bin/echo "I got executed at" && /bin/date' + + @flow.block + async def block1(*args): + print(f'block1 started at {time.time()}') + t1 = task1() + t2 = task2(t1) + await t2 + + @flow.block + async def block2(*args): + print(f'block2 started at {time.time()}') + t3 = task1() + t4 = task2(t3) + await t4 + + @flow.block + async def block1_of_blocks(*args): + print(f'block of blocks-1 started at {time.time()}') + b1 = block1() + b2 = block2(b1) + await b2 + + @flow.block + async def block2_of_blocks(*args): + print(f'block of blocks-2 started at {time.time()}') + b1 = block1() + b2 = block2(b1) + await b2 + + async def run_block_of_blocks(i): + bob1 = block1_of_blocks() + bob2 = block2_of_blocks(bob1) + await bob2 + print(f'Block of blocks-{i} is finished') + + await asyncio.gather(*[run_block_of_blocks(i) for i in range(2)]) if __name__ == '__main__': diff --git a/examples/async_blocks.py b/examples/async_blocks.py index e01e57d..2c75788 100644 --- a/examples/async_blocks.py +++ b/examples/async_blocks.py @@ -7,75 +7,73 @@ async def main(): # Create backend and workflow backend = await RadicalExecutionBackend({'resource': 'local.localhost'}) - flow = await WorkflowEngine.create(backend=backend) + async with await WorkflowEngine.create(backend=backend) as flow: - @flow.executable_task - async def task1(*args): - return '/bin/echo "I got executed at" && /bin/date' + @flow.executable_task + async def task1(*args): + return '/bin/echo "I got executed at" && /bin/date' - @flow.executable_task - async def task2(*args): - return '/bin/echo "I got executed at" && /bin/date' + @flow.executable_task + async def task2(*args): + return '/bin/echo "I got executed at" && /bin/date' - @flow.executable_task - async def task3(*args): - return '/bin/echo "I got executed at" && /bin/date' + @flow.executable_task + async def task3(*args): + return '/bin/echo "I got executed at" && /bin/date' - @flow.executable_task - async def task4(*args): - return '/bin/echo "I got executed at" && /bin/date' + @flow.executable_task + async def task4(*args): + return '/bin/echo "I got executed at" && /bin/date' - @flow.executable_task - async def task5(*args): - return '/bin/echo "I got executed at" && /bin/date' + @flow.executable_task + async def task5(*args): + return '/bin/echo "I got executed at" && /bin/date' - @flow.block - async def block1(wf_id, *args): - print(f'Starting workflow {wf_id}') - t1 = task1() - tt = task1() - t2 = task2(t1) - t3 = task3(t1, t2) - t4 = task4(t3) + @flow.block + async def block1(wf_id, *args): + print(f'Starting workflow {wf_id}') + t1 = task1() + tt = task1() + t2 = task2(t1) + t3 = task3(t1, t2) + t4 = task4(t3) - if await t4: - t5 = task5() - await t5 + if await t4: + t5 = task5() + await t5 - return f'Workflow {wf_id} completed' + return f'Workflow {wf_id} completed' - @flow.block - async def block2(wf_id, *args): - print(f'\nStarting workflow {wf_id}') - t1 = task1() - t2 = task2(t1) - t3 = task3(t1, t2) - t4 = task4(t3) + @flow.block + async def block2(wf_id, *args): + print(f'\nStarting workflow {wf_id}') + t1 = task1() + t2 = task2(t1) + t3 = task3(t1, t2) + t4 = task4(t3) - if await t4: - t5 = task5() - await t5 + if await t4: + t5 = task5() + await t5 - return f'Workflow {wf_id} completed' + return f'Workflow {wf_id} completed' - async def run_blocks(wf_id): + async def run_blocks(wf_id): - b1 = block1(wf_id) - b2 = block2(wf_id, b1) - await b2 - return "run blocks finished" + b1 = block1(wf_id) + b2 = block2(wf_id, b1) + await b2 + return "run blocks finished" - # Run workflows concurrently - results = await asyncio.gather(*[run_blocks(i) for i in range(1024)]) + # Run workflows concurrently + results = await asyncio.gather(*[run_blocks(i) for i in range(1024)]) - for result in results: - print(result) - - await flow.shutdown() + for result in results: + print(result) if __name__ == '__main__': asyncio.run(main()) diff --git a/examples/async_workflows.py b/examples/async_workflows.py index 55adad7..d4a5963 100644 --- a/examples/async_workflows.py +++ b/examples/async_workflows.py @@ -7,40 +7,40 @@ async def main(): # Create backend and workflow backend = await RadicalExecutionBackend({'resource': 'local.localhost'}) - flow = await WorkflowEngine.create(backend=backend) + async with await WorkflowEngine.create(backend=backend) as flow: - @flow.executable_task - async def task1(*args): - return '/bin/echo "I got executed at" && /bin/date' + @flow.executable_task + async def task1(*args): + return '/bin/echo "I got executed at" && /bin/date' - @flow.executable_task - async def task2(*args): - return '/bin/echo "I got executed at" && /bin/date' + @flow.executable_task + async def task2(*args): + return '/bin/echo "I got executed at" && /bin/date' - @flow.executable_task - async def task3(*args): - return '/bin/echo "I got executed at" && /bin/date' + @flow.executable_task + async def task3(*args): + return '/bin/echo "I got executed at" && /bin/date' - @flow.executable_task - async def task4(*args): - return '/bin/echo "I got executed at" && /bin/date' + @flow.executable_task + async def task4(*args): + return '/bin/echo "I got executed at" && /bin/date' - @flow.executable_task - async def task5(*args): - return '/bin/echo "I got executed at" && /bin/date' + @flow.executable_task + async def task5(*args): + return '/bin/echo "I got executed at" && /bin/date' - async def run_wf(wf_id): - print(f'\nStarting workflow {wf_id} at {time.time()}') - t1 = task1() - t2 = task2(t1) - t3 = task3(t1, t2) - t4 = task4(t3) + async def run_wf(wf_id): + print(f'\nStarting workflow {wf_id} at {time.time()}') + t1 = task1() + t2 = task2(t1) + t3 = task3(t1, t2) + t4 = task4(t3) - if await t4: - t5 = task5() - await t5 + if await t4: + t5 = task5() + await t5 - return f'Workflow {wf_id} completed at {time.time()}' + return f'Workflow {wf_id} completed at {time.time()}' # Run workflows concurrently results = await asyncio.gather(*[run_wf(i) for i in range(1024)]) @@ -48,7 +48,5 @@ async def run_wf(wf_id): for result in results: print(result) - await flow.shutdown() - if __name__ == '__main__': asyncio.run(main()) diff --git a/examples/explicit_data.py b/examples/explicit_data.py index bef3dd8..b5de34f 100644 --- a/examples/explicit_data.py +++ b/examples/explicit_data.py @@ -1,4 +1,5 @@ import time +import asyncio from radical.asyncflow import WorkflowEngine from radical.asyncflow import InputFile, OutputFile @@ -6,23 +7,21 @@ async def main(): backend = await RadicalExecutionBackend({'resource': 'local.localhost'}) - flow = await WorkflowEngine.create(backend=backend) + async with await WorkflowEngine.create(backend=backend) as flow: - @flow.executable_task - def task1(*args): - return 'echo "This is a file from task1" > t1_output.txt' + @flow.executable_task + def task1(*args): + return 'echo "This is a file from task1" > t1_output.txt' - @flow.executable_task - def task2(*args): - return '/bin/cat t1_output.txt' + @flow.executable_task + def task2(*args): + return '/bin/cat t1_output.txt' - t1 = task1(OutputFile('t1_output.txt')) - t2 = task2(t1, InputFile('t1_output.txt')) + t1 = task1(OutputFile('t1_output.txt')) + t2 = task2(t1, InputFile('t1_output.txt')) - print(await t2) - - await flow.shutdown() + print(await t2) if __name__ == '__main__': diff --git a/examples/tutorials/build_async_workflows.ipynb b/examples/tutorials/build_async_workflows.ipynb index 8239611..8647492 100644 --- a/examples/tutorials/build_async_workflows.ipynb +++ b/examples/tutorials/build_async_workflows.ipynb @@ -105,35 +105,31 @@ "\n", "backend = await ConcurrentExecutionBackend(ThreadPoolExecutor())\n", "\n", - "flow = await WorkflowEngine.create(backend=backend)\n", + "async with WorkflowEngine(backend=backend) as flow:\n", + " @flow.function_task\n", + " async def task1(*args):\n", + " return time.time()\n", "\n", - "@flow.function_task\n", - "async def task1(*args):\n", - " return time.time()\n", + " @flow.function_task\n", + " async def task2(*args):\n", + " return time.time()\n", "\n", - "@flow.function_task\n", - "async def task2(*args):\n", - " return time.time()\n", + " @flow.function_task\n", + " async def task3(*args):\n", + " return time.time()\n", "\n", - "@flow.function_task\n", - "async def task3(*args):\n", - " return time.time()\n", + " async def run_wf(wf_id):\n", "\n", - "async def run_wf(wf_id):\n", + " print(f'Starting workflow {wf_id} at {time.time()}')\n", + " t3 = task3(task1(), task2())\n", + " await t3 # Blocking operation so the entire workflow will block\n", + " print(f'Workflow {wf_id} completed at {time.time()}')\n", "\n", - " print(f'Starting workflow {wf_id} at {time.time()}')\n", - " t3 = task3(task1(), task2())\n", - " await t3 # Blocking operation so the entire workflow will block\n", - " print(f'Workflow {wf_id} completed at {time.time()}')\n", + " start_time = time.time()\n", + " await asyncio.gather(*[run_wf(i) for i in range(5)])\n", + " end_time = time.time()\n", "\n", - "start_time = time.time()\n", - "await asyncio.gather(*[run_wf(i) for i in range(5)])\n", - "end_time = time.time()\n", - "\n", - "print(f'\\nTotal time running asynchronously is: {end_time - start_time}')\n", - "\n", - "# We are in an async context, so we have to use **await**\n", - "await flow.shutdown()" + " print(f'\\nTotal time running asynchronously is: {end_time - start_time}')" ] }, { @@ -173,35 +169,32 @@ "\n", "# backend = await ConcurrentExecutionBackend(ThreadPoolExecutor())\n", "\n", - "flow = await WorkflowEngine.create(backend=backend)\n", - "\n", - "@flow.executable_task\n", - "async def task1(*args):\n", - " return \"/bin/date\"\n", - "\n", - "@flow.executable_task\n", - "async def task2(*args):\n", - " return \"/bin/date\"\n", + "async with WorkflowEngine(backend=backend) as flow:\n", + " @flow.executable_task\n", + " async def task1(*args):\n", + " return \"/bin/date\"\n", "\n", - "@flow.executable_task\n", - "async def task3(*args):\n", - " return \"/bin/date\"\n", + " @flow.executable_task\n", + " async def task2(*args):\n", + " return \"/bin/date\"\n", "\n", - "async def run_wf(wf_id):\n", + " @flow.executable_task\n", + " async def task3(*args):\n", + " return \"/bin/date\"\n", "\n", - " print(f'Starting workflow {wf_id} at {time.time()}')\n", - " t3 = task3(task1(), task2())\n", - " await t3 # Blocking operation so the entire workflow will block\n", - " print(f'Workflow {wf_id} completed at {time.time()}')\n", + " async def run_wf(wf_id):\n", "\n", - "start_time = time.time()\n", - "await asyncio.gather(*[run_wf(i) for i in range(5)])\n", - "end_time = time.time()\n", + " print(f'Starting workflow {wf_id} at {time.time()}')\n", + " t3 = task3(task1(), task2())\n", + " await t3 # Blocking operation so the entire workflow will block\n", + " print(f'Workflow {wf_id} completed at {time.time()}')\n", "\n", - "print(f'\\nTotal time running asynchronously is: {end_time - start_time}')\n", + " start_time = time.time()\n", + " await asyncio.gather(*[run_wf(i) for i in range(5)])\n", + " end_time = time.time()\n", "\n", - "# We are in an async context, so we have to use **await**\n", - "await flow.shutdown()" + " print(f'\\nTotal time running asynchronously is: {end_time - start_time}')\n", + "\n" ] } ], diff --git a/examples/various_execution_backends.py b/examples/various_execution_backends.py index 5be9015..8695a3f 100644 --- a/examples/various_execution_backends.py +++ b/examples/various_execution_backends.py @@ -10,31 +10,29 @@ async def main(backends): for backend, resource in backends.items(): backend = await backend(resource) - flow = await WorkflowEngine.create(backend=backend) + async with await WorkflowEngine.create(backend=backend) as flow: - task = flow.executable_task - if isinstance(backend, DaskExecutionBackend): - task = flow.function_task + task = flow.executable_task + if isinstance(backend, DaskExecutionBackend): + task = flow.function_task - @task - async def task1(*args): - return '/bin/sleep 10' + @task + async def task1(*args): + return '/bin/sleep 10' - @task - async def task2(*args): - return '/bin/date' + @task + async def task2(*args): + return '/bin/date' - @task - async def task3(*args): - return '/bin/date' + @task + async def task3(*args): + return '/bin/date' - t1 = task1() - t2 = task2() - t3 = await task3(t1, t2) + t1 = task1() + t2 = task2() + t3 = await task3(t1, t2) - print(t3) - - await flow.shutdown() + print(t3) if __name__ == "__main__": print('Running 1-layer funnel DAG workflow with each backend\n') diff --git a/examples/various_workflows.py b/examples/various_workflows.py index cb960b6..3ab689a 100644 --- a/examples/various_workflows.py +++ b/examples/various_workflows.py @@ -3,55 +3,55 @@ from radical.asyncflow import WorkflowEngine from radical.asyncflow import RadicalExecutionBackend -async def main() - backend = await RadicalExecutionBackend({'resource': 'local.localhost'}) - flow = await WorkflowEngine.create(backend=backend) +async def main(): + backend = await RadicalExecutionBackend({'resource': 'local.localhost'}) + async with await WorkflowEngine.create(backend=backend) as flow: - @flow.executable_task - async def task1(*args): - return '/bin/echo $RP_TASK_NAME' + @flow.executable_task + async def task1(*args): + return '/bin/echo $RP_TASK_NAME' - @flow.executable_task - async def task2(*args): - return '/bin/echo $RP_TASK_NAME' + @flow.executable_task + async def task2(*args): + return '/bin/echo $RP_TASK_NAME' - @flow.executable_task - async def task3(*args): - return '/bin/echo $RP_TASK_NAME' + @flow.executable_task + async def task3(*args): + return '/bin/echo $RP_TASK_NAME' - # ==================================================== - # Workflow-1: 1-layer funnel DAG - print('Running 1-layer funnel DAG workflow\n') - print("Shape:") - print(""" + # ==================================================== + # Workflow-1: 1-layer funnel DAG + print('Running 1-layer funnel DAG workflow\n') + print("Shape:") + print(""" task1 task2 <---- running in parallel \\ / task3 """) - t3 = task3(task1(), task2()) + t3 = task3(task1(), task2()) - print(await t3) + print(await t3) - # ==================================================== - # Workflow-2: 2-layer funnel DAG - print('Running 2-layer funnel DAG workflow\n') - print("Shape:") - print(""" + # ==================================================== + # Workflow-2: 2-layer funnel DAG + print('Running 2-layer funnel DAG workflow\n') + print("Shape:") + print(""" task1 task2 <---- running in parallel | | task2 task1 <---- running in parallel \\ / task3 """) - t3 = task3(task2(task1()), task1(task2())) - print(await t3) + t3 = task3(task2(task1()), task1(task2())) + print(await t3) - # ==================================================== - # Workflow-3: Sequential Pipelines (Repeated Twice) - print('Running sequential pipelines\n') - print("Shape:") - print(""" + # ==================================================== + # Workflow-3: Sequential Pipelines (Repeated Twice) + print('Running sequential pipelines\n') + print("Shape:") + print(""" task1 | task2 @@ -64,16 +64,16 @@ async def task3(*args): | task3 """) - res = [] - for i in range(2): - t3 = task3(task2(task1())) - print(await t3) + res = [] + for i in range(2): + t3 = task3(task2(task1())) + print(await t3) - # ==================================================== - # Workflow-4: Concurrent Pipelines - print('Running concurrent pipelines\n') - print("Shape:") - print(""" + # ==================================================== + # Workflow-4: Concurrent Pipelines + print('Running concurrent pipelines\n') + print("Shape:") + print(""" task1 task1 | | @@ -81,14 +81,12 @@ async def task3(*args): | | task3 task3 """) - res = [] - for i in range(2): - t3 = task3(task2(task1())) - res.append(t3) + res = [] + for i in range(2): + t3 = task3(task2(task1())) + res.append(t3) - await asyncio.gather(*res) - - await flow.shutdown() + await asyncio.gather(*res) if __name__ == '__main__': asyncio.run(main()) diff --git a/src/radical/asyncflow/workflow_manager.py b/src/radical/asyncflow/workflow_manager.py index f5de8c0..091b273 100644 --- a/src/radical/asyncflow/workflow_manager.py +++ b/src/radical/asyncflow/workflow_manager.py @@ -43,7 +43,7 @@ class WorkflowEngine: @typeguard.typechecked def __init__(self, backend: BaseExecutionBackend, - dry_run: bool = False, implicit_data: bool = True) -> None: + dry_run: bool = False, implicit_data: bool = True, skip_execution_backend: bool = False) -> None: """ Initialize the WorkflowEngine (sync part only). @@ -53,6 +53,7 @@ def __init__(self, backend: BaseExecutionBackend, backend: Execution backend (required, pre-validated) dry_run: Whether to run in dry-run mode implicit_data: Whether to enable implicit data dependency linking + skip_execution_backend: Defining logic for exiting logic of context mangaer """ # Get the current running loop - assume it exists self.loop = _get_event_loop_or_raise("WorkflowEngine") @@ -68,6 +69,8 @@ def __init__(self, backend: BaseExecutionBackend, self.dry_run = dry_run self.queue = asyncio.Queue() self.implicit_data_mode = implicit_data + self.skip_execution_backend = skip_execution_backend + # Optimization: Track component state changes self._ready_queue = deque() @@ -98,7 +101,7 @@ def __init__(self, backend: BaseExecutionBackend, @classmethod async def create(cls, backend: Optional[BaseExecutionBackend] = None, - dry_run: bool = False, implicit_data: bool = True) -> 'WorkflowEngine': + dry_run: bool = False, implicit_data: bool = True, skip_execution_backend: bool = False) -> 'WorkflowEngine': """ Factory method to create and initialize a WorkflowEngine. @@ -117,7 +120,7 @@ async def create(cls, backend: Optional[BaseExecutionBackend] = None, validated_backend = cls._setup_execution_backend(backend, dry_run) # Create instance with validated backend - instance = cls(backend=validated_backend, dry_run=dry_run, implicit_data=implicit_data) + instance = cls(backend=validated_backend, dry_run=dry_run, implicit_data=implicit_data, skip_execution_backend=skip_execution_backend) # Initialize async components await instance._start_async_components() @@ -1123,6 +1126,13 @@ async def shutdown(self, skip_execution_backend: bool = False): self.log.debug("Shutting down execution backend") else: self.log.warning("Skipping execution backend shutdown as requested") + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.shutdown(self.skip_execution_backend) + @staticmethod def shutdown_on_failure(func: Callable): diff --git a/tests/integration/backends/test_dask_parallel.py b/tests/integration/backends/test_dask_parallel.py index 0f6ab97..71f7c87 100644 --- a/tests/integration/backends/test_dask_parallel.py +++ b/tests/integration/backends/test_dask_parallel.py @@ -16,10 +16,10 @@ async def flow(): 'dashboard_address':None } ) - flow = await WorkflowEngine.create(backend=backend) - - # provide the flow to the test - yield flow + async with await WorkflowEngine.create(backend=backend) as flow: + # provide the flow to the test + yield flow + @pytest.mark.asyncio async def test_funnel_dag_with_dask_backend(flow): @@ -211,5 +211,4 @@ async def child(x): with pytest.raises(asyncio.CancelledError): # child cannot complete await t2 - - await flow.shutdown() + \ No newline at end of file diff --git a/tests/integration/backends/test_radical_pilot.py b/tests/integration/backends/test_radical_pilot.py index dd37486..2d092c4 100644 --- a/tests/integration/backends/test_radical_pilot.py +++ b/tests/integration/backends/test_radical_pilot.py @@ -17,176 +17,169 @@ async def backend(): @pytest.mark.asyncio async def test_async_bag_of_tasks(backend): - flow = await WorkflowEngine.create(backend=backend) + async with await WorkflowEngine.create(backend=backend) as flow: - @flow.executable_task - async def echo_task(i): - return f'/bin/echo "Task {i} executed at" && /bin/date' + @flow.executable_task + async def echo_task(i): + return f'/bin/echo "Task {i} executed at" && /bin/date' - bag_size = 10 - tasks = [echo_task(i) for i in range(bag_size)] - results = await asyncio.gather(*tasks) + bag_size = 10 + tasks = [echo_task(i) for i in range(bag_size)] + results = await asyncio.gather(*tasks) - assert len(results) == bag_size - for i, result in enumerate(results): - assert result is not None - assert isinstance(result, (str, bytes)) + assert len(results) == bag_size + for i, result in enumerate(results): + assert result is not None + assert isinstance(result, (str, bytes)) @pytest.mark.asyncio async def test_radical_backend_reject_service_task_function(backend): - flow = await WorkflowEngine.create(backend=backend) - with pytest.raises(ValueError, match="RadicalExecutionBackend does not support function service tasks"): - @flow.function_task(service=True) - async def bad_task2(): - return True + async with await WorkflowEngine.create(backend=backend, skip_execution_backend=True) as flow: + with pytest.raises(ValueError, match="RadicalExecutionBackend does not support function service tasks"): + @flow.function_task(service=True) + async def bad_task2(): + return True - await bad_task2() + await bad_task2() - await flow.shutdown(skip_execution_backend=True) @pytest.mark.asyncio async def test_radical_backend_reject_function_task_with_raptor_off(backend): - flow = await WorkflowEngine.create(backend=backend) + async with await WorkflowEngine.create(backend=backend, skip_execution_backend=True) as flow: - with pytest.raises(RuntimeError): - @flow.function_task - async def bad_task3(): - return True + with pytest.raises(RuntimeError): + @flow.function_task + async def bad_task3(): + return True - await bad_task3() + await bad_task3() - await flow.shutdown(skip_execution_backend=True) @pytest.mark.asyncio async def test_radical_backend_implicit_data(backend): - flow = await WorkflowEngine.create(backend=backend) + async with await WorkflowEngine.create(backend=backend) as flow: - @flow.executable_task - async def task1(*args): - return 'echo "This is a file from task1" > t1_output.txt' + @flow.executable_task + async def task1(*args): + return 'echo "This is a file from task1" > t1_output.txt' - @flow.executable_task - async def task2(*args): - return '/bin/cat t1_output.txt' + @flow.executable_task + async def task2(*args): + return '/bin/cat t1_output.txt' - t1 = task1() - t2 = task2(t1) + t1 = task1() + t2 = task2(t1) - assert await t2 == 'This is a file from task1\n' - await flow.shutdown(skip_execution_backend=True) + assert await t2 == 'This is a file from task1\n' @pytest.mark.asyncio async def test_radical_backend_explicit_data(backend): - flow = await WorkflowEngine.create(backend=backend) + async with await WorkflowEngine.create(backend=backend) as flow: - @flow.executable_task - async def task1(*args): - return 'echo "This is a file from task1" > t1_output.txt' + @flow.executable_task + async def task1(*args): + return 'echo "This is a file from task1" > t1_output.txt' - @flow.executable_task - async def task2(*args): - return '/bin/cat t1_output.txt' + @flow.executable_task + async def task2(*args): + return '/bin/cat t1_output.txt' - t1 = task1(OutputFile('t1_output.txt')) - t2 = task2(t1, InputFile('t1_output.txt')) + t1 = task1(OutputFile('t1_output.txt')) + t2 = task2(t1, InputFile('t1_output.txt')) - assert await t2 == 'This is a file from task1\n' - await flow.shutdown(skip_execution_backend=True) + assert await t2 == 'This is a file from task1\n' @pytest.mark.asyncio async def test_radical_backend_input_data_staging(backend): - flow = await WorkflowEngine.create(backend=backend) + async with await WorkflowEngine.create(backend=backend) as flow: - with open('t1_input.txt', 'w') as f: - f.write('This is a file staged in to task1\n') + with open('t1_input.txt', 'w') as f: + f.write('This is a file staged in to task1\n') - @flow.executable_task - async def task1(*args): - return '/bin/cat t1_input.txt' + @flow.executable_task + async def task1(*args): + return '/bin/cat t1_input.txt' - t1 = task1(InputFile('t1_input.txt')) + t1 = task1(InputFile('t1_input.txt')) - assert await t1 == 'This is a file staged in to task1\n' - await flow.shutdown(skip_execution_backend=True) + assert await t1 == 'This is a file staged in to task1\n' @pytest.mark.asyncio async def test_async_cancel_tasks(backend): - flow = await WorkflowEngine.create(backend=backend) + async with await WorkflowEngine.create(backend=backend) as flow: - @flow.executable_task - async def task(): - return '/bin/sleep 10' + @flow.executable_task + async def task(): + return '/bin/sleep 10' - t1 = task() - t2 = task() + t1 = task() + t2 = task() - await asyncio.sleep(1) + await asyncio.sleep(1) - t1.cancel() - t2.cancel() + t1.cancel() + t2.cancel() - with pytest.raises(asyncio.CancelledError): - await t1 - with pytest.raises(asyncio.CancelledError): - await t2 + with pytest.raises(asyncio.CancelledError): + await t1 + with pytest.raises(asyncio.CancelledError): + await t2 @pytest.mark.asyncio async def test_async_cancel_before_start(backend): - flow = await WorkflowEngine.create(backend=backend) + async with await WorkflowEngine.create(backend=backend) as flow: - @flow.executable_task - async def slow_task(): - return '/bin/sleep 5' + @flow.executable_task + async def slow_task(): + return '/bin/sleep 5' - @flow.executable_task - async def fast_task(): - return '/bin/echo "done"' + @flow.executable_task + async def fast_task(): + return '/bin/echo "done"' - t1 = slow_task() - t2 = slow_task() + t1 = slow_task() + t2 = slow_task() - await asyncio.sleep(1) - t2.cancel() + await asyncio.sleep(1) + t2.cancel() - await t1 - with pytest.raises(asyncio.CancelledError): - await t2 + await t1 + with pytest.raises(asyncio.CancelledError): + await t2 @pytest.mark.asyncio async def test_async_cancel_after_completion(backend): - flow = await WorkflowEngine.create(backend=backend) + async with await WorkflowEngine.create(backend=backend) as flow: - @flow.executable_task - async def quick_task(): - return "/bin/echo 'done'" + @flow.executable_task + async def quick_task(): + return "/bin/echo 'done'" - t = quick_task() - result = await t - assert result.strip() == "done" + t = quick_task() + result = await t + assert result.strip() == "done" - t.cancel() - t_result = await t - assert t_result.strip() == "done" + t.cancel() + t_result = await t + assert t_result.strip() == "done" @pytest.mark.asyncio async def test_async_cancel_one_of_many(backend): - flow = await WorkflowEngine.create(backend=backend) + async with await WorkflowEngine.create(backend=backend) as flow: - @flow.executable_task - async def task(n): - return '/bin/sleep 5 && /bin/echo "2"' + @flow.executable_task + async def task(n): + return '/bin/sleep 5 && /bin/echo "2"' - t1 = task(1) - t2 = task(2) + t1 = task(1) + t2 = task(2) - await asyncio.sleep(1) - t1.cancel() - - with pytest.raises(asyncio.CancelledError): - await t1 + await asyncio.sleep(1) + t1.cancel() - str_result = await t2 - assert int(str_result) == 2 + with pytest.raises(asyncio.CancelledError): + await t1 - await flow.shutdown() + str_result = await t2 + assert int(str_result) == 2 diff --git a/tests/integration/test_workflow_failures.py b/tests/integration/test_workflow_failures.py index a70c7dd..b84bde9 100644 --- a/tests/integration/test_workflow_failures.py +++ b/tests/integration/test_workflow_failures.py @@ -25,29 +25,26 @@ async def test_task_failure_handling(): - failing_task raises an Exception with the message "Simulated failure". """ backend = await ConcurrentExecutionBackend(ThreadPoolExecutor()) - flow = await WorkflowEngine.create(backend=backend) - - @flow.function_task - async def good_task(): - return "success" + async with await WorkflowEngine.create(backend=backend) as flow: + + @flow.function_task + async def good_task(): + return "success" - @flow.function_task - async def failing_task(): - raise RuntimeError("Simulated failure") + @flow.function_task + async def failing_task(): + raise RuntimeError("Simulated failure") - @flow.function_task - async def dependent_task(prev_result): - return f"Got: {prev_result}" + @flow.function_task + async def dependent_task(prev_result): + return f"Got: {prev_result}" - try: # Run good task first res = await good_task() assert res == "success" with pytest.raises(RuntimeError, match="Simulated failure"): await failing_task() - finally: - await flow.shutdown() @pytest.mark.asyncio @@ -64,23 +61,19 @@ async def test_awaiting_failed_task_propagates_exception(): shut down. """ backend = await ConcurrentExecutionBackend(ThreadPoolExecutor()) - flow = await WorkflowEngine.create(backend=backend) - - @flow.function_task - async def task1(): - raise ValueError("Intentional failure in task1") + async with await WorkflowEngine.create(backend=backend) as flow: + @flow.function_task + async def task1(): + raise ValueError("Intentional failure in task1") - @flow.function_task - async def task2(x): - return f"Received: {x}" + @flow.function_task + async def task2(x): + return f"Received: {x}" - try: # The test will fail at await task1(), so task2 should never run with pytest.raises(ValueError, match="Intentional failure in task1"): await task2(await task1()) - - finally: - await flow.shutdown() + @pytest.mark.asyncio @@ -93,27 +86,26 @@ async def test_independent_workflow_failures_do_not_affect_others(): """ backend = await ConcurrentExecutionBackend(ThreadPoolExecutor()) - flow = await WorkflowEngine.create(backend=backend) - - @flow.function_task - async def task1(wf_id): - if wf_id == 0: - raise Exception("Intentional failure in workflow 0 task1") - return f"task1 success from wf {wf_id}" - - @flow.function_task - async def task2(x): - return f"task2 got: {x}" - - async def run_wf(wf_id): - try: - t1 = await task1(wf_id) - t2 = await task2(t1) - return f"Workflow {wf_id} success: {t2}" - except Exception as e: - return f"Workflow {wf_id} failed: {str(e)}" - - try: + async with await WorkflowEngine.create(backend=backend) as flow: + + @flow.function_task + async def task1(wf_id): + if wf_id == 0: + raise Exception("Intentional failure in workflow 0 task1") + return f"task1 success from wf {wf_id}" + + @flow.function_task + async def task2(x): + return f"task2 got: {x}" + + async def run_wf(wf_id): + try: + t1 = await task1(wf_id) + t2 = await task2(t1) + return f"Workflow {wf_id} success: {t2}" + except Exception as e: + return f"Workflow {wf_id} failed: {str(e)}" + # Run 5 workflows concurrently, workflow 0 should fail results = await asyncio.gather(*[run_wf(i) for i in range(5)]) @@ -124,5 +116,3 @@ async def run_wf(wf_id): for i in range(1, 5): assert f"Workflow {i} success:" in results[i] - finally: - await flow.shutdown() diff --git a/tests/integration/test_workflow_runs_to_completion.py b/tests/integration/test_workflow_runs_to_completion.py index 775ac99..91187f7 100644 --- a/tests/integration/test_workflow_runs_to_completion.py +++ b/tests/integration/test_workflow_runs_to_completion.py @@ -12,52 +12,51 @@ async def test_flow_function_tasks(): which is verified at the end of execution. """ backend = await ConcurrentExecutionBackend(ThreadPoolExecutor()) - flow = await WorkflowEngine.create(backend=backend) - - # Shared state is passed and returned explicitly across tasks - @flow.function_task - async def task1(): - return {"steps": ["task1"], "value": 1} - - @flow.function_task - async def task2(state): - state["steps"].append("task2") - state["value"] += 2 - return state - - @flow.function_task - async def task3(state): - state["steps"].append("task3") - state["value"] *= 3 - return state - - @flow.function_task - async def task4(state): - state["steps"].append("task4") - state["value"] -= 4 - return state - - @flow.function_task - async def task5(state): - state["steps"].append("task5") - state["value"] = state["value"] ** 2 - return state - - async def run_wf(wf_id): - """ - Runs a chain of function tasks where each builds upon shared state. - """ - print(f'\n[WF {wf_id}] Start: {time.time():.2f}') - s1 = await task1() - s2 = await task2(s1) - s3 = await task3(s2) - s4 = await task4(s3) - s5 = await task5(s4) - - print(f'[WF {wf_id}] Done: {time.time():.2f} — Final state: {s5}') - return s5 - - try: + async with await WorkflowEngine.create(backend=backend) as flow: + + # Shared state is passed and returned explicitly across tasks + @flow.function_task + async def task1(): + return {"steps": ["task1"], "value": 1} + + @flow.function_task + async def task2(state): + state["steps"].append("task2") + state["value"] += 2 + return state + + @flow.function_task + async def task3(state): + state["steps"].append("task3") + state["value"] *= 3 + return state + + @flow.function_task + async def task4(state): + state["steps"].append("task4") + state["value"] -= 4 + return state + + @flow.function_task + async def task5(state): + state["steps"].append("task5") + state["value"] = state["value"] ** 2 + return state + + async def run_wf(wf_id): + """ + Runs a chain of function tasks where each builds upon shared state. + """ + print(f'\n[WF {wf_id}] Start: {time.time():.2f}') + s1 = await task1() + s2 = await task2(s1) + s3 = await task3(s2) + s4 = await task4(s3) + s5 = await task5(s4) + + print(f'[WF {wf_id}] Done: {time.time():.2f} — Final state: {s5}') + return s5 + num_workflows = 4 results = await asyncio.gather(*[run_wf(i) for i in range(num_workflows)]) @@ -68,8 +67,6 @@ async def run_wf(wf_id): "task4", "task5"] assert res["value"] == 25 - finally: - await flow.shutdown() print("Workflow engine shutdown complete.") @@ -81,50 +78,49 @@ async def test_flow_executable_tasks(tmp_path): Final task output is used to validate execution order. """ backend = await ConcurrentExecutionBackend(ThreadPoolExecutor()) - flow = await WorkflowEngine.create(backend=backend) + async with await WorkflowEngine.create(backend=backend) as flow: - # Define executable tasks that append their ID to a shared file - @flow.executable_task - async def task1(wf_file): - return f'echo "task1" >> {wf_file}' + # Define executable tasks that append their ID to a shared file + @flow.executable_task + async def task1(wf_file): + return f'echo "task1" >> {wf_file}' - @flow.executable_task - async def task2(wf_file, t1): - return f'echo "task2" >> {wf_file}' + @flow.executable_task + async def task2(wf_file, t1): + return f'echo "task2" >> {wf_file}' - @flow.executable_task - async def task3(wf_file, t2): - return f'echo "task3" >> {wf_file}' + @flow.executable_task + async def task3(wf_file, t2): + return f'echo "task3" >> {wf_file}' - @flow.executable_task - async def task4(wf_file, t3): - return f'echo "task4" >> {wf_file}' + @flow.executable_task + async def task4(wf_file, t3): + return f'echo "task4" >> {wf_file}' - @flow.executable_task - async def task5(wf_file, t4): - return f'echo "task5" >> {wf_file}' + @flow.executable_task + async def task5(wf_file, t4): + return f'echo "task5" >> {wf_file}' - async def run_wf(wf_id): - """ - Runs executable tasks that log their execution to a local file. - """ - wf_file = tmp_path / f"workflow_{wf_id}.log" - wf_file_path = str(wf_file) + async def run_wf(wf_id): + """ + Runs executable tasks that log their execution to a local file. + """ + wf_file = tmp_path / f"workflow_{wf_id}.log" + wf_file_path = str(wf_file) - print(f'\n[WF {wf_id}] Start: {time.time():.2f}') + print(f'\n[WF {wf_id}] Start: {time.time():.2f}') - t1 = task1(wf_file_path) - t2 = task2(wf_file_path, t1) - t3 = task3(wf_file_path, t2) - t4 = task4(wf_file_path, t3) - t5 = task5(wf_file_path, t4) + t1 = task1(wf_file_path) + t2 = task2(wf_file_path, t1) + t3 = task3(wf_file_path, t2) + t4 = task4(wf_file_path, t3) + t5 = task5(wf_file_path, t4) - await t5 + await t5 - print(f'[WF {wf_id}] Done: {time.time():.2f}') - return wf_file_path + print(f'[WF {wf_id}] Done: {time.time():.2f}') + return wf_file_path - try: num_workflows = 4 file_paths = await asyncio.gather(*[run_wf(i) for i in range(num_workflows)]) @@ -134,8 +130,6 @@ async def run_wf(wf_id): assert lines == ["task1", "task2", "task3", "task4", "task5"], \ f"Unexpected task sequence in {path}: {lines}" - finally: - await flow.shutdown() print("Workflow engine shutdown complete.") @@ -146,59 +140,58 @@ async def test_flow_mixed_function_and_executable_tasks(tmp_path): Function tasks modify state, while executable tasks log their invocation. """ backend = await ConcurrentExecutionBackend(ThreadPoolExecutor()) - flow = await WorkflowEngine.create(backend=backend) - - # Function tasks - @flow.function_task - async def f_task1(): - return {"steps": ["f_task1"], "value": 1} - - @flow.function_task - async def f_task2(state): - state["steps"].append("f_task2") - state["value"] += 2 - return state - - # Executable tasks - @flow.executable_task - async def e_task1(wf_file, state): - return f'echo "e_task1:{state["value"]}" >> {wf_file}' - - @flow.executable_task - async def e_task2(wf_file, t1): - return f'echo "e_task2" >> {wf_file}' - - @flow.function_task - async def f_task3(state): - state["steps"].append("f_task3") - state["value"] *= 3 - return state - - @flow.executable_task - async def e_task3(wf_file, t2): - return f'echo "e_task3" >> {wf_file}' - - async def run_wf(wf_id): - """ - Mixed function and executable tasks within one workflow. - """ - wf_file = tmp_path / f"wf_{wf_id}.log" - wf_file_path = str(wf_file) - - print(f'\n[WF {wf_id}] Start: {time.time():.2f}') - - s1 = await f_task1() - s2 = await f_task2(s1) - t1 = e_task1(wf_file_path, s2) - t2 = e_task2(wf_file_path, t1) - s3 = await f_task3(s2) - t3 = e_task3(wf_file_path, t2) - await t3 - - print(f'[WF {wf_id}] Done: {time.time():.2f} — Final state: {s3}') - return s3, wf_file - - try: + async with await WorkflowEngine.create(backend=backend) as flow: + + # Function tasks + @flow.function_task + async def f_task1(): + return {"steps": ["f_task1"], "value": 1} + + @flow.function_task + async def f_task2(state): + state["steps"].append("f_task2") + state["value"] += 2 + return state + + # Executable tasks + @flow.executable_task + async def e_task1(wf_file, state): + return f'echo "e_task1:{state["value"]}" >> {wf_file}' + + @flow.executable_task + async def e_task2(wf_file, t1): + return f'echo "e_task2" >> {wf_file}' + + @flow.function_task + async def f_task3(state): + state["steps"].append("f_task3") + state["value"] *= 3 + return state + + @flow.executable_task + async def e_task3(wf_file, t2): + return f'echo "e_task3" >> {wf_file}' + + async def run_wf(wf_id): + """ + Mixed function and executable tasks within one workflow. + """ + wf_file = tmp_path / f"wf_{wf_id}.log" + wf_file_path = str(wf_file) + + print(f'\n[WF {wf_id}] Start: {time.time():.2f}') + + s1 = await f_task1() + s2 = await f_task2(s1) + t1 = e_task1(wf_file_path, s2) + t2 = e_task2(wf_file_path, t1) + s3 = await f_task3(s2) + t3 = e_task3(wf_file_path, t2) + await t3 + + print(f'[WF {wf_id}] Done: {time.time():.2f} — Final state: {s3}') + return s3, wf_file + num_workflows = 2 results = await asyncio.gather(*[run_wf(i) for i in range(num_workflows)]) @@ -213,6 +206,4 @@ async def run_wf(wf_id): assert lines[1] == "e_task2" assert lines[2] == "e_task3" - finally: - await flow.shutdown() print("Workflow engine shutdown complete.") diff --git a/tests/unit/test_dependencies_resloution.py b/tests/unit/test_dependencies_resloution.py index 0e93e80..31ea38c 100644 --- a/tests/unit/test_dependencies_resloution.py +++ b/tests/unit/test_dependencies_resloution.py @@ -15,9 +15,8 @@ class TestFutureResolution: async def flow(self): """Create a WorkflowEngine instance for testing.""" backend = await ConcurrentExecutionBackend(ThreadPoolExecutor()) - flow = await WorkflowEngine.create(backend=backend) - yield flow - await flow.shutdown() + async with await WorkflowEngine.create(backend=backend) as flow: + yield flow @pytest.mark.asyncio async def test_simple_future_resolution(self, flow): @@ -235,9 +234,7 @@ async def test_real_workflow_scenario(): """Integration test simulating a real workflow scenario.""" backend = await ConcurrentExecutionBackend(ThreadPoolExecutor()) - flow = await WorkflowEngine.create(backend=backend) - - try: + async with await WorkflowEngine.create(backend=backend) as flow: @flow.function_task async def fetch_data(source): """Simulate data fetching.""" @@ -269,5 +266,3 @@ async def combine_results(result1, result2): expected = "combined_processed_data_from_api_with_config_a_and_processed_data_from_database_with_config_b" assert result == expected - finally: - await flow.shutdown() diff --git a/tests/unit/test_failure_propagation.py b/tests/unit/test_failure_propagation.py index bcc49c4..1b178e0 100644 --- a/tests/unit/test_failure_propagation.py +++ b/tests/unit/test_failure_propagation.py @@ -14,10 +14,9 @@ @pytest_asyncio.fixture async def flow(): backend = await ConcurrentExecutionBackend(ThreadPoolExecutor()) - flow = await WorkflowEngine.create(backend=backend) - yield flow - await asyncio.sleep(0) # allow any pending tasks to finish - await flow.shutdown(skip_execution_backend=True) + async with await WorkflowEngine.create(backend=backend, skip_execution_backend=True) as flow: + yield flow + await asyncio.sleep(0) # allow any pending tasks to finish @pytest.mark.asyncio async def test_dependency_failure_exception_creation(flow): diff --git a/tests/unit/test_termination.py b/tests/unit/test_termination.py index e59916b..ed7870a 100644 --- a/tests/unit/test_termination.py +++ b/tests/unit/test_termination.py @@ -6,15 +6,15 @@ @pytest.mark.asyncio async def test_async_shutdown(): """ - Unit test: ensures `flow.shutdown()` completes without error in async context, + Unit test: ensures `flow.shutdown()`(inside the context manager) completes without error in async context, and backend.shutdown() is called once. """ backend = await ConcurrentExecutionBackend(ThreadPoolExecutor()) backend.shutdown = AsyncMock() - flow = await WorkflowEngine.create(backend=backend) + async with await WorkflowEngine.create(backend=backend) as flow: + pass try: - await flow.shutdown() backend.shutdown.assert_called_once() except Exception as e: pytest.fail(f"Async shutdown raised unexpected error: {e}")