Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
8982e77
feat: workflows v2 initial code
michael-desmond Sep 17, 2025
73a4e24
fix: support OR in after decorator
michael-desmond Sep 18, 2025
823a7ec
feat: fork and join
michael-desmond Sep 19, 2025
b744641
fix: use set to check for dependency completion
michael-desmond Sep 22, 2025
4d04802
fix: example with fast slow looping
michael-desmond Sep 22, 2025
fd37063
Merge branch 'main' into feat/workflows_v2
michael-desmond Sep 22, 2025
7985f00
fix: chatmodel runnable interface
michael-desmond Sep 22, 2025
c519fd1
feat: end decorator, execution tracking
michael-desmond Sep 23, 2025
81a7fe0
feat: implement runnable
michael-desmond Sep 24, 2025
d70ad48
fix: refactor, retry decorator
michael-desmond Sep 30, 2025
fcf6adb
Merge branch 'main' into feat/workflows_v2
michael-desmond Sep 30, 2025
7776ad2
Merge branch 'main' into feat/workflows_v2
michael-desmond Oct 3, 2025
908e44d
feat: explicit and/or conditions
michael-desmond Oct 14, 2025
6468f39
Merge branch 'main' into feat/workflows_v2
michael-desmond Oct 14, 2025
93ff244
feat: print workflows to html
michael-desmond Oct 17, 2025
921c14b
feat: upstream parameter handling
michael-desmond Oct 30, 2025
979e020
Merge branch 'main' into feat/workflows_v2
michael-desmond Oct 30, 2025
2b48aa3
Merge branch 'main' into feat/workflows_v2
michael-desmond Nov 6, 2025
855952f
feat: direct workflow defintion via then and branch
michael-desmond Nov 6, 2025
592e010
fix: ruff formatting
michael-desmond Nov 6, 2025
9b6baea
feat: implicit start step, update examples
michael-desmond Nov 13, 2025
9667ea8
Merge branch 'main' into feat/workflows_v2
michael-desmond Nov 13, 2025
14b88a5
feat: ruff formatting
michael-desmond Nov 13, 2025
3673cbc
fix: remame messages to input
michael-desmond Nov 13, 2025
2485af9
feat: add finalize method to handle runnable output
michael-desmond Nov 13, 2025
440b714
feat: decompose step executor
michael-desmond Nov 13, 2025
589ab2f
feat: remove unused variable
michael-desmond Nov 13, 2025
b2f0f74
feat: parameter chaining between steps
michael-desmond Nov 13, 2025
bef3cfb
Merge branch 'main' into feat/workflows_v2
michael-desmond Nov 14, 2025
4239185
feat: introduce workflow chainable
michael-desmond Nov 14, 2025
f6db444
Merge branch 'main' into feat/workflows_v2
michael-desmond Nov 18, 2025
0af5c21
feat: track upstream completion, better step abstractions
michael-desmond Nov 18, 2025
8c8f6e3
Merge branch 'main' into feat/workflows_v2
michael-desmond Nov 19, 2025
d6c372c
fix: better graph structure, fix branching
michael-desmond Nov 19, 2025
6d518d5
fix: add type hint to sets in back edge detection
michael-desmond Nov 19, 2025
b2b1351
Merge branch 'main' into feat/workflows_v2
michael-desmond Nov 20, 2025
7313d54
fix: improve graph construction, optional branch fn, no end_step
michael-desmond Nov 20, 2025
32b47a0
Merge branch 'main' into feat/workflows_v2
michael-desmond Nov 25, 2025
7781216
fix: incorporate feedback from review
michael-desmond Nov 25, 2025
54444f3
Merge branch 'main' into feat/workflows_v2
michael-desmond Dec 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions python/beeai_framework/workflows/v2/decorators/_and.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0

from beeai_framework.workflows.v2.types import AsyncMethod, AsyncMethodSet


def _and(*methods: AsyncMethod | str) -> AsyncMethodSet:
asm = AsyncMethodSet()

for method in methods:
if isinstance(method, str):
asm.methods.append(method)
elif callable(method):
asm.methods.append(method.__name__)

return asm
17 changes: 17 additions & 0 deletions python/beeai_framework/workflows/v2/decorators/_or.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0

from beeai_framework.workflows.v2.types import AsyncMethod, AsyncMethodSet


def _or(*methods: AsyncMethod | str) -> AsyncMethodSet:
asm = AsyncMethodSet()

for method in methods:
if isinstance(method, str):
asm.methods.append(method)
elif callable(method):
asm.methods.append(method.__name__)

asm.condition = "or"
return asm
15 changes: 15 additions & 0 deletions python/beeai_framework/workflows/v2/decorators/after.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0

from collections.abc import Callable

from beeai_framework.workflows.v2.types import AsyncMethod, AsyncMethodSet


def after(dependency: str | AsyncMethod | AsyncMethodSet) -> Callable[[AsyncMethod], AsyncMethod]:
def decorator(func: AsyncMethod) -> AsyncMethod:
func._is_step = True # type: ignore[attr-defined]
func._dependency = dependency # type: ignore[attr-defined]
return func

return decorator
20 changes: 20 additions & 0 deletions python/beeai_framework/workflows/v2/decorators/end.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0


from collections.abc import Callable, Coroutine
from typing import Any, Concatenate, ParamSpec, TypeVar

from beeai_framework.backend.message import AnyMessage
from beeai_framework.workflows.v2.workflow import Workflow

S = TypeVar("S", bound=Workflow)
P = ParamSpec("P")

EndMethod = Callable[Concatenate[S, P], Coroutine[Any, Any, list[AnyMessage]]]


def end(func: EndMethod[S, P]) -> EndMethod[S, P]:
func._is_step = True # type: ignore[attr-defined]
func._is_end = True # type: ignore[attr-defined]
return func
9 changes: 9 additions & 0 deletions python/beeai_framework/workflows/v2/decorators/fork.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0

from beeai_framework.workflows.v2.types import AsyncMethod


def fork(func: AsyncMethod) -> AsyncMethod:
func._is_fork = True # type: ignore[attr-defined]
return func
9 changes: 9 additions & 0 deletions python/beeai_framework/workflows/v2/decorators/join.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0

from beeai_framework.workflows.v2.types import AsyncMethod


def join(func: AsyncMethod) -> AsyncMethod:
func._is_join = True # type: ignore[attr-defined]
return func
15 changes: 15 additions & 0 deletions python/beeai_framework/workflows/v2/decorators/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0


from collections.abc import Callable

from beeai_framework.workflows.v2.types import AsyncMethod


def retry(n: int = 1) -> Callable[[AsyncMethod], AsyncMethod]:
def decorator(func: AsyncMethod) -> AsyncMethod:
func._retries = n # type: ignore[attr-defined]
return func

return decorator
19 changes: 19 additions & 0 deletions python/beeai_framework/workflows/v2/decorators/start.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0


from collections.abc import Callable, Coroutine
from typing import Any, TypeVar

from beeai_framework.backend.message import AnyMessage
from beeai_framework.workflows.v2.workflow import Workflow

S = TypeVar("S", bound=Workflow)
AnyCoroutine = Coroutine[Any, Any, Any]
StartMethod = Callable[[S, list[AnyMessage]], AnyCoroutine]


def start(func: StartMethod[S]) -> StartMethod[S]:
func._is_step = True # type: ignore[attr-defined]
func._is_start = True # type: ignore[attr-defined]
return func
20 changes: 20 additions & 0 deletions python/beeai_framework/workflows/v2/decorators/when.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0

from collections.abc import Callable

from beeai_framework.workflows.v2.types import AsyncMethod

Predicate = Callable[..., bool]


def when(predicate: Predicate) -> Callable[[AsyncMethod], AsyncMethod]:
"""
Async decorator: runs the async function only if `predicate` returns True.
"""

def decorator(func: AsyncMethod) -> AsyncMethod:
func._when_predicate = predicate # type: ignore[attr-defined]
return func

return decorator
40 changes: 40 additions & 0 deletions python/beeai_framework/workflows/v2/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0

from pydantic import BaseModel, ConfigDict

from beeai_framework.workflows.v2.step import WorkflowStep


class WorkflowStartEvent(BaseModel): ...


class WorkflowStepEvent(BaseModel):
step: WorkflowStep
model_config = ConfigDict(arbitrary_types_allowed=True)


class WorkflowStartStepEvent(WorkflowStepEvent):
pass


class WorkflowErrorEvent(WorkflowStepEvent):
error: Exception
attempt: int


class WorkflowRetryStepEvent(WorkflowStepEvent):
error: Exception
attempt: int


class WorkflowSuccessEvent(BaseModel): ...


workflow_v2_event_types: dict[str, type] = {
"start": WorkflowStartEvent,
"start_step": WorkflowStartStepEvent,
"error": WorkflowErrorEvent,
"retry_step": WorkflowRetryStepEvent,
"success": WorkflowSuccessEvent,
}
115 changes: 115 additions & 0 deletions python/beeai_framework/workflows/v2/step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0

import asyncio
from datetime import datetime
from typing import Any

from pydantic import BaseModel, ConfigDict

from beeai_framework.workflows.v2.decorators.when import Predicate
from beeai_framework.workflows.v2.types import AsyncMethod, ExecutionCondition


class WorkflowStepExecution(BaseModel):
inputs: list[Any] = []
output: Any | None
error: Exception | None
started_at: datetime | None
ended_at: datetime | None
duration: float

model_config = ConfigDict(arbitrary_types_allowed=True)


class WorkflowStep:
def __init__(
self,
func: AsyncMethod,
start: bool = False,
end: bool = False,
fork: bool = False,
join: bool = False,
retries: int = 0,
) -> None:
self._func = func
self._name = func.__name__

self._is_start = start
self._is_end = end
self._is_fork = fork
self._is_join = join
self._retries = retries

self._condition: ExecutionCondition = "and"
self.forked: list[WorkflowStep] = []
self._dependencies: list[WorkflowStep] = []
self._dependents: list[WorkflowStep] = []
self.completed_dependencies: list[WorkflowStep] = []
self.inputs: dict[int, Any] = {}

self.completed_event = asyncio.Event()
self._predicates: list[Predicate] = []
self._executions: list[WorkflowStepExecution] = []

@property
def name(self) -> str:
return self._name

@property
def func(self) -> AsyncMethod:
return self._func

@property
def is_start(self) -> bool:
return self._is_start

@property
def is_end(self) -> bool:
return self._is_end

@property
def is_fork(self) -> bool:
return self._is_fork

@property
def is_join(self) -> bool:
return self._is_join

@property
def retries(self) -> int:
return self._retries

@property
def condition(self) -> ExecutionCondition:
return self._condition

@condition.setter
def condition(self, value: ExecutionCondition) -> None:
self._condition = value

def add_predicate(self, predicate: Predicate) -> None:
self._predicates.append(predicate)

@property
def predicates(self) -> list[Predicate]:
return self._predicates

@property
def executions(self) -> list[WorkflowStepExecution]:
return self._executions

def add_dependency(self, dep: "WorkflowStep") -> None:
self._dependencies.append(dep)
dep._dependents.append(self)

@property
def dependencies(self) -> list["WorkflowStep"]:
return self._dependencies

@property
def dependents(self) -> list["WorkflowStep"]:
return self._dependents

def last_execution(self) -> WorkflowStepExecution | None:
return self.executions[-1] if self.executions else None
16 changes: 16 additions & 0 deletions python/beeai_framework/workflows/v2/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0

from collections.abc import Callable, Coroutine
from typing import Any, Literal

from pydantic import BaseModel

AsyncMethod = Callable[..., Coroutine[Any, Any, Any]]

ExecutionCondition = Literal["and", "or"]


class AsyncMethodSet(BaseModel):
methods: list[str] = []
condition: ExecutionCondition = "and"
Loading