Skip to content

Conversation

@michael-desmond
Copy link
Member

Which issue(s) does this pull-request address?

Addresses #1025

Description

Lightweight agent orchestration using decorators.

Checklist

General

Code quality checks

  • Code quality checks pass: mise check (mise fix to auto-fix)

Testing

  • Unit tests pass: mise test:unit
  • E2E tests pass: mise test:e2e
  • Tests are included (for bug fixes or new features)

Documentation

  • Documentation is updated
  • Embedme embeds code examples in docs. To update after edits, run: Python mise docs:fix

Signed-off-by: MICHAEL DESMOND <[email protected]>
Signed-off-by: MICHAEL DESMOND <[email protected]>
Signed-off-by: MICHAEL DESMOND <[email protected]>
@michael-desmond michael-desmond added the python Python related functionality label Sep 22, 2025
michael-desmond and others added 26 commits September 22, 2025 14:48
Signed-off-by: MICHAEL DESMOND <[email protected]>
Signed-off-by: MICHAEL DESMOND <[email protected]>
Signed-off-by: MICHAEL DESMOND <[email protected]>
Signed-off-by: MICHAEL DESMOND <[email protected]>
Signed-off-by: MICHAEL DESMOND <[email protected]>
Signed-off-by: MICHAEL DESMOND <[email protected]>
Signed-off-by: MICHAEL DESMOND <[email protected]>
Signed-off-by: MICHAEL DESMOND <[email protected]>
Signed-off-by: MICHAEL DESMOND <[email protected]>
Signed-off-by: MICHAEL DESMOND <[email protected]>
Signed-off-by: MICHAEL DESMOND <[email protected]>
Comment on lines +183 to +187
output: RunnableOutput = (
completed_steps[-1].result
if completed_steps and isinstance(completed_steps[-1].result, RunnableOutput)
else RunnableOutput(output=[])
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In parallel/branching graphs, this is nondeterministic. Workflows with multiple terminal leaves may not return the intended value. Is this handled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the .after operator it is possible to build a workflow with multiple terminal steps, in which case you get an empty RunnableOutput from the outer workflow instance. Its up to the programmer of the workflow to properly join to a terminal step that returns RunnableOutput.

Comment on lines +146 to +169
# Enqueue downstream
for ds_edge in step._downstream_edges:
ds = ds_edge.target
enqueue_ds = True
# Check all upstream edges to determine execution conditions
for up_edge in ds._upstream_edges:
up = up_edge.source

# If upstream has not executed and is its an and edge dont queue
if not up.has_executed and up_edge.type == "and":
enqueue_ds = False
break

# Check for conditional execution
if enqueue_ds and ds_edge.condition is not None:
if ds_edge.condition.fn:
enqueue_ds = bool(await run_callable(ds_edge.condition.fn, *results) == ds_edge.condition.key)
else:
enqueue_ds = bool(step.result == ds_edge.condition.key)

if enqueue_ds:
ds.has_executed = False
await queue.put(ds)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a downstream step has multiple upstream edges marked as or, each upstream completion may enqueue the downstream, causing multiple concurrent executions and potential state races.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but the Workflow builder is designed to prevent this situation so that the workflow is predictable. I.e. parallel steps have an implicit downstream join step with and edges. And you cant call .then after .branch because you would be joining multiple upstream into a single downstream. This is the tricky part of the entire system.


if tasks:
# done, _ = await asyncio.wait(tasks)
done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that other tasks should be awaited or cancelled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you await all of the tasks here then the system will synchronize to the current level in the queue and it wont be able to queue downstream steps that can safely be executed concurrently.

T = TypeVar("T", bound="Workflow")


class step: # noqa: N801
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a function instead of a class.

Comment on lines +45 to +49
def __get__(self, instance: None, owner: type[T]) -> "step": ...
@overload
def __get__(self, instance: T, owner: type[T]) -> WorkflowStep: ...

def __get__(self, instance: T | None, owner: type[T]) -> Any:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These overloads might not be needed as we are no longer using is_start etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These overloads are necessary so that mypy wont get upset about the class being step but the return value of the get being a WorkflowStep.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

python Python related functionality

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants